You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:05 UTC

[14/23] cassandra git commit: Change commitlog and sstables to track dirty and clean intervals.

Change commitlog and sstables to track dirty and clean intervals.

patch by Branimir Lambov; reviewed by Sylvain Lebresne for
CASSANDRA-11828


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/904cb5d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/904cb5d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/904cb5d1

Branch: refs/heads/trunk
Commit: 904cb5d10e0de1a6ca89249be8c257ed38a80ef0
Parents: cf85f52
Author: Branimir Lambov <br...@datastax.com>
Authored: Sat May 14 11:31:16 2016 +0300
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:38:37 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  13 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  66 +---
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  18 +-
 .../cassandra/db/commitlog/CommitLog.java       |  11 +-
 .../db/commitlog/CommitLogReplayer.java         |  59 +++-
 .../db/commitlog/CommitLogSegment.java          |  77 ++---
 .../db/commitlog/CommitLogSegmentManager.java   |   4 +-
 .../cassandra/db/commitlog/IntervalSet.java     | 192 +++++++++++
 .../cassandra/db/commitlog/ReplayPosition.java  |  71 ----
 .../compaction/AbstractCompactionStrategy.java  |   3 +
 .../compaction/CompactionStrategyManager.java   |   3 +
 .../apache/cassandra/db/lifecycle/Tracker.java  |  44 +--
 .../org/apache/cassandra/db/lifecycle/View.java |  36 +-
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  14 +-
 .../metadata/LegacyMetadataSerializer.java      |  17 +-
 .../io/sstable/metadata/MetadataCollector.java  |  38 +--
 .../io/sstable/metadata/StatsMetadata.java      |  44 +--
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++
 .../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
 .../legacy_mc_clust/mc-1-big-Data.db            | Bin 0 -> 5355 bytes
 .../legacy_mc_clust/mc-1-big-Digest.crc32       |   1 +
 .../legacy_mc_clust/mc-1-big-Filter.db          | Bin 0 -> 24 bytes
 .../legacy_mc_clust/mc-1-big-Index.db           | Bin 0 -> 157553 bytes
 .../legacy_mc_clust/mc-1-big-Statistics.db      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust/mc-1-big-Summary.db         | Bin 0 -> 47 bytes
 .../legacy_mc_clust/mc-1-big-TOC.txt            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_mc_clust_compact/mc-1-big-Data.db    | Bin 0 -> 5382 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_compact/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_compact/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7086 bytes
 .../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_compact/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../legacy_mc_clust_counter/mc-1-big-Data.db    | Bin 0 -> 4631 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_clust_counter/mc-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mc_clust_counter/mc-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mc_clust_counter/mc-1-big-TOC.txt    |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 4625 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 7095 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple/mc-1-big-Data.db           | Bin 0 -> 89 bytes
 .../legacy_mc_simple/mc-1-big-Digest.crc32      |   1 +
 .../legacy_mc_simple/mc-1-big-Filter.db         | Bin 0 -> 24 bytes
 .../legacy_mc_simple/mc-1-big-Index.db          | Bin 0 -> 26 bytes
 .../legacy_mc_simple/mc-1-big-Statistics.db     | Bin 0 -> 4639 bytes
 .../legacy_mc_simple/mc-1-big-Summary.db        | Bin 0 -> 47 bytes
 .../legacy_mc_simple/mc-1-big-TOC.txt           |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_compact/mc-1-big-Data.db   | Bin 0 -> 91 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_compact/mc-1-big-Index.db  | Bin 0 -> 26 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4680 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_compact/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mc_simple_counter/mc-1-big-Data.db   | Bin 0 -> 110 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mc_simple_counter/mc-1-big-Index.db  | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4648 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mc_simple_counter/mc-1-big-TOC.txt   |   8 +
 .../mc-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../mc-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../mc-1-big-Digest.crc32                       |   1 +
 .../mc-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mc-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../mc-1-big-Statistics.db                      | Bin 0 -> 4689 bytes
 .../mc-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mc-1-big-TOC.txt                            |   8 +
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 test/unit/org/apache/cassandra/Util.java        |  21 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |  12 +-
 .../apache/cassandra/cql3/OutOfSpaceTest.java   |  33 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 159 ++++++++-
 .../cassandra/db/compaction/NeverPurgeTest.java |   6 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  12 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../metadata/MetadataSerializerTest.java        |  16 +-
 .../cassandra/utils/IntegerIntervalsTest.java   | 326 +++++++++++++++++++
 98 files changed, 1229 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046c8b3..b596fc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
  * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
  * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
  * Fix upgrade of super columns on thrift (CASSANDRA-12335)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..bc733d7 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
@@ -101,6 +103,17 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     }
 
     /**
+     * Testing only!
+     * Clear the set of unwritable directories.
+     */
+    @VisibleForTesting
+    public static void clearUnwritableUnsafe()
+    {
+        instance.unwritableDirectories.clear();
+    }
+
+
+    /**
      * Tells whether or not the directory is blacklisted for reads.
      * @return whether or not the directory is blacklisted for reads.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 400fd36..82604e2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -179,9 +179,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    @VisibleForTesting
-    public static volatile ColumnFamilyStore discardFlushResults;
-
     public final Keyspace keyspace;
     public final String name;
     public final CFMetaData metadata;
@@ -926,25 +923,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
         volatile FSWriteError flushFailure = null;
-        final ReplayPosition commitLogUpperBound;
         final List<Memtable> memtables;
-        final List<Collection<SSTableReader>> readers;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
-                          List<Memtable> memtables, List<Collection<SSTableReader>> readers)
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
+                          List<Memtable> memtables)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
-            this.commitLogUpperBound = commitLogUpperBound;
             this.memtables = memtables;
-            this.readers = readers;
         }
 
         public ReplayPosition call()
         {
-            if (discardFlushResults == ColumnFamilyStore.this)
-                return commitLogUpperBound;
-
             writeBarrier.await();
 
             /**
@@ -968,17 +958,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new IllegalStateException();
             }
 
+            ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
             // If a flush errored out but the error was ignored, make sure we don't discard the commit log.
-            if (flushFailure == null)
+            if (flushFailure == null && !memtables.isEmpty())
             {
-                CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
-                for (int i = 0 ; i < memtables.size() ; i++)
-                {
-                    Memtable memtable = memtables.get(i);
-                    Collection<SSTableReader> reader = readers.get(i);
-                    memtable.cfs.data.permitCompactionOfFlushed(reader);
-                    memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
-                }
+                Memtable memtable = memtables.get(0);
+                commitLogUpperBound = memtable.getCommitLogUpperBound();
+                CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound);
             }
 
             metric.pendingFlushes.dec();
@@ -1002,7 +988,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final OpOrder.Barrier writeBarrier;
         final List<Memtable> memtables = new ArrayList<>();
-        final List<Collection<SSTableReader>> readers = new ArrayList<>();
         final PostFlush postFlush;
         final boolean truncate;
 
@@ -1044,7 +1029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
+            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
         }
 
         public void run()
@@ -1063,8 +1048,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 memtable.cfs.data.markFlushing(memtable);
                 if (memtable.isClean() || truncate)
                 {
-                    memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
-                    memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
+                    memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
                     reclaim(memtable);
                     iter.remove();
                 }
@@ -1077,9 +1061,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = memtable.flush();
-                    memtable.cfs.data.replaceFlushed(memtable, readers);
+                    memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
-                    this.readers.add(readers);
                 }
             }
             catch (FSWriteError e)
@@ -1126,21 +1109,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    @VisibleForTesting
-    // this method should ONLY be used for testing commit log behaviour; it discards the current memtable
-    // contents without marking the commit log clean, and prevents any proceeding flushes from marking
-    // the commit log as done, however they *will* terminate (unlike under typical failures) to ensure progress is made
-    public void simulateFailedFlush()
-    {
-        discardFlushResults = this;
-        data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this)));
-    }
-
-    public void resumeFlushing()
-    {
-        discardFlushResults = null;
-    }
-
     /**
      * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
      * queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
@@ -1483,16 +1451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data;
     }
 
-    public Collection<SSTableReader> getSSTables()
-    {
-        return data.getSSTables();
-    }
-
-    public Iterable<SSTableReader> getPermittedToCompactSSTables()
-    {
-        return data.getPermittedToCompact();
-    }
-
     public Set<SSTableReader> getLiveSSTables()
     {
         return data.getView().liveSSTables();
@@ -2032,7 +1990,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         long now = System.currentTimeMillis();
         // make sure none of our sstables are somehow in the future (clock drift, perhaps)
         for (ColumnFamilyStore cfs : concatWithIndexes())
-            for (SSTableReader sstable : cfs.data.getSSTables())
+            for (SSTableReader sstable : cfs.getLiveSSTables())
                 now = Math.max(now, sstable.maxDataAge);
         truncatedAt = now;
 
@@ -2130,7 +2088,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             public LifecycleTransaction call() throws Exception
             {
                 assert data.getCompacting().isEmpty() : data.getCompacting();
-                Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+                Iterable<SSTableReader> sstables = getLiveSSTables();
                 sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
                 sstables = ImmutableList.copyOf(sstables);
                 LifecycleTransaction modifier = data.tryModify(sstables, operationType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 01ffd52..877f984 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -385,7 +385,7 @@ public class Directories
 
         if (candidates.isEmpty())
             if (tooBig)
-                throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
+                throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), "");
             else
                 throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), "");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 93dc5af..3c77092 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
@@ -193,6 +194,11 @@ public class Memtable implements Comparable<Memtable>
         return commitLogLowerBound.get();
     }
 
+    public ReplayPosition getCommitLogUpperBound()
+    {
+        return commitLogUpperBound.get();
+    }
+
     public boolean isLive()
     {
         return allocator.isLive();
@@ -331,6 +337,15 @@ public class Memtable implements Comparable<Memtable>
         return minTimestamp;
     }
 
+    /**
+     * For testing only. Give this memtable too big a size to make it always fail flushing.
+     */
+    @VisibleForTesting
+    public void makeUnflushable()
+    {
+        liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+    }
+
     private long estimatedSize()
     {
         long keySize = 0;
@@ -418,8 +433,7 @@ public class Memtable implements Comparable<Memtable>
         {
             txn = LifecycleTransaction.offline(OperationType.FLUSH);
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
-                                                         .commitLogLowerBound(commitLogLowerBound.get())
-                                                         .commitLogUpperBound(commitLogUpperBound.get());
+                    .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get()));
 
             return new SSTableTxnWriter(txn,
                                         cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index dcdd855..dfe3f91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -290,11 +290,12 @@ public class CommitLog implements CommitLogMBean
      * given. Discards any commit log segments that are no longer used.
      *
      * @param cfId    the column family ID that was flushed
-     * @param context the replay position of the flush
+     * @param lowerBound the lowest covered replay position of the flush
+     * @param lowerBound the highest covered replay position of the flush
      */
-    public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
+    public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound)
     {
-        logger.trace("discard completed log segments for {}, table {}", context, cfId);
+        logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId);
 
         // Go thru the active segment files, which are ordered oldest to newest, marking the
         // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
@@ -303,7 +304,7 @@ public class CommitLog implements CommitLogMBean
         for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
         {
             CommitLogSegment segment = iter.next();
-            segment.markClean(cfId, context);
+            segment.markClean(cfId, lowerBound, upperBound);
 
             if (segment.isUnused())
             {
@@ -318,7 +319,7 @@ public class CommitLog implements CommitLogMBean
 
             // Don't mark or try to delete any newer segments once we've reached the one containing the
             // position of the flush.
-            if (segment.contains(context))
+            if (segment.contains(upperBound))
                 break;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index f45a47a..af8efb4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -35,6 +35,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.commons.lang3.StringUtils;
@@ -52,6 +53,7 @@ import org.apache.cassandra.io.util.FileSegmentInputStream;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -74,7 +76,7 @@ public class CommitLogReplayer
     private final List<Future<?>> futures;
     private final Map<UUID, AtomicInteger> invalidMutations;
     private final AtomicInteger replayedCount;
-    private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
+    private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted;
     private final ReplayPosition globalPosition;
     private final CRC32 checksum;
     private byte[] buffer;
@@ -83,7 +85,7 @@ public class CommitLogReplayer
     private final ReplayFilter replayFilter;
     private final CommitLogArchiver archiver;
 
-    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
+    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter)
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayList<Future<?>>();
@@ -101,10 +103,9 @@ public class CommitLogReplayer
 
     public static CommitLogReplayer construct(CommitLog commitLog)
     {
-        // compute per-CF and global replay positions
-        Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
+        // compute per-CF and global replay intervals
+        Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>();
         ReplayFilter replayFilter = ReplayFilter.create();
-        ReplayPosition globalPosition = null;
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
@@ -129,14 +130,10 @@ public class CommitLogReplayer
                 }
             }
 
-            ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
-            if (!filter.isEmpty())
-                cfPersisted.put(cfs.metadata.cfId, filter);
-            else
-                globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
+            IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
+            cfPersisted.put(cfs.metadata.cfId, filter);
         }
-        if (globalPosition == null)
-            globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
+        ReplayPosition globalPosition = firstNotCovered(cfPersisted.values());
         logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
         return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
     }
@@ -148,6 +145,41 @@ public class CommitLogReplayer
             recover(clogs[i], i + 1 == clogs.length);
     }
 
+    /**
+     * A set of known safe-to-discard commit log replay positions, based on
+     * the range covered by on disk sstables and those prior to the most recent truncation record
+     */
+    public static IntervalSet<ReplayPosition> persistedIntervals(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
+    {
+        IntervalSet.Builder<ReplayPosition> builder = new IntervalSet.Builder<>();
+        for (SSTableReader reader : onDisk)
+            builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
+
+        if (truncatedAt != null)
+            builder.add(ReplayPosition.NONE, truncatedAt);
+        return builder.build();
+    }
+
+    /**
+     * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
+     *
+     * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
+     * given table was constructed* and hence we can start replay from the end of that interval.
+     *
+     * If such an interval is not known, we must replay from the beginning.
+     *
+     * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
+     *   succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
+     *   incorrect during replay there is little chance that the affected deployment is in production.
+     */
+    public static ReplayPosition firstNotCovered(Collection<IntervalSet<ReplayPosition>> ranges)
+    {
+        return ranges.stream()
+                .map(intervals -> Iterables.getFirst(intervals.ends(), ReplayPosition.NONE)) 
+                .min(Ordering.natural())
+                .get(); // iteration is per known-CF, there must be at least one. 
+    }
+
     public int blockForWrites()
     {
         for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
@@ -293,8 +325,7 @@ public class CommitLogReplayer
      */
     private boolean shouldReplay(UUID cfId, ReplayPosition position)
     {
-        ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
-        return filter == null || filter.shouldReplay(position);
+        return !cfPersisted.get(cfId).contains(position);
     }
 
     @SuppressWarnings("resource")

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 27c05b4..d2f12bf 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.IntegerInterval;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
@@ -101,11 +102,11 @@ public abstract class CommitLogSegment
     // a signal for writers to wait on to confirm the log message they provided has been written to disk
     private final WaitQueue syncComplete = new WaitQueue();
 
-    // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use
-    private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024);
+    // a map of Cf->dirty interval in this segment; if interval is not covered by the clean set, the log contains unflushed data
+    private final NonBlockingHashMap<UUID, IntegerInterval> cfDirty = new NonBlockingHashMap<>(1024);
 
-    // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use
-    private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>();
+    // a map of Cf->clean intervals; separate map from above to permit marking Cfs clean whilst the log is still in use
+    private final ConcurrentHashMap<UUID, IntegerInterval.Set> cfClean = new ConcurrentHashMap<>();
 
     public final long id;
 
@@ -423,10 +424,23 @@ public abstract class CommitLogSegment
         }
     }
 
+    public static<K> void coverInMap(ConcurrentMap<K, IntegerInterval> map, K key, int value)
+    {
+        IntegerInterval i = map.get(key);
+        if (i == null)
+        {
+            i = map.putIfAbsent(key, new IntegerInterval(value, value));
+            if (i == null)
+                // success
+                return;
+        }
+        i.expandToCover(value);
+    }
+
     void markDirty(Mutation mutation, int allocatedPosition)
     {
         for (PartitionUpdate update : mutation.getPartitionUpdates())
-            ensureAtleast(cfDirty, update.metadata().cfId, allocatedPosition);
+            coverInMap(cfDirty, update.metadata().cfId, allocatedPosition);
     }
 
     /**
@@ -437,55 +451,32 @@ public abstract class CommitLogSegment
      * @param cfId    the column family ID that is now clean
      * @param context the optional clean offset
      */
-    public synchronized void markClean(UUID cfId, ReplayPosition context)
+    public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition)
     {
+        if (startPosition.segment > id || endPosition.segment < id)
+            return;
         if (!cfDirty.containsKey(cfId))
             return;
-        if (context.segment == id)
-            markClean(cfId, context.position);
-        else if (context.segment > id)
-            markClean(cfId, Integer.MAX_VALUE);
-    }
-
-    private void markClean(UUID cfId, int position)
-    {
-        ensureAtleast(cfClean, cfId, position);
+        int start = startPosition.segment == id ? startPosition.position : 0;
+        int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE;
+        cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end);
         removeCleanFromDirty();
     }
 
-    private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value)
-    {
-        AtomicInteger i = map.get(cfId);
-        if (i == null)
-        {
-            AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
-            if (i2 != null)
-                i = i2;
-        }
-        while (true)
-        {
-            int cur = i.get();
-            if (cur > value)
-                break;
-            if (i.compareAndSet(cur, value))
-                break;
-        }
-    }
-
     private void removeCleanFromDirty()
     {
         // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely
         if (isStillAllocating())
             return;
 
-        Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator();
+        Iterator<Map.Entry<UUID, IntegerInterval.Set>> iter = cfClean.entrySet().iterator();
         while (iter.hasNext())
         {
-            Map.Entry<UUID, AtomicInteger> clean = iter.next();
+            Map.Entry<UUID, IntegerInterval.Set> clean = iter.next();
             UUID cfId = clean.getKey();
-            AtomicInteger cleanPos = clean.getValue();
-            AtomicInteger dirtyPos = cfDirty.get(cfId);
-            if (dirtyPos != null && dirtyPos.intValue() <= cleanPos.intValue())
+            IntegerInterval.Set cleanSet = clean.getValue();
+            IntegerInterval dirtyInterval = cfDirty.get(cfId);
+            if (dirtyInterval != null && cleanSet.covers(dirtyInterval))
             {
                 cfDirty.remove(cfId);
                 iter.remove();
@@ -502,12 +493,12 @@ public abstract class CommitLogSegment
             return cfDirty.keySet();
 
         List<UUID> r = new ArrayList<>(cfDirty.size());
-        for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
+        for (Map.Entry<UUID, IntegerInterval> dirty : cfDirty.entrySet())
         {
             UUID cfId = dirty.getKey();
-            AtomicInteger dirtyPos = dirty.getValue();
-            AtomicInteger cleanPos = cfClean.get(cfId);
-            if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
+            IntegerInterval dirtyInterval = dirty.getValue();
+            IntegerInterval.Set cleanSet = cfClean.get(cfId);
+            if (cleanSet == null || !cleanSet.covers(dirtyInterval))
                 r.add(dirty.getKey());
         }
         return r;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 66ad6a3..82cee50 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -310,7 +310,7 @@ public class CommitLogSegmentManager
 
             for (CommitLogSegment segment : activeSegments)
                 for (UUID cfId : droppedCfs)
-                    segment.markClean(cfId, segment.getContext());
+                    segment.markClean(cfId, ReplayPosition.NONE, segment.getContext());
 
             // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
             // if the previous active segment was the only one to recycle (since an active segment isn't
@@ -451,7 +451,7 @@ public class CommitLogSegmentManager
                     // even though we remove the schema entry before a final flush when dropping a CF,
                     // it's still possible for a writer to race and finish his append after the flush.
                     logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
-                    segment.markClean(dirtyCFId, segment.getContext());
+                    segment.markClean(dirtyCFId, ReplayPosition.NONE, segment.getContext());
                 }
                 else if (!flushes.containsKey(dirtyCFId))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
new file mode 100644
index 0000000..bd0ea22
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
@@ -0,0 +1,192 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * An immutable set of closed intervals, stored in normalized form (i.e. where overlapping intervals are converted
+ * to a single interval covering both).
+ *
+ * The set is stored as a sorted map from interval starts to the corresponding end. The map satisfies
+ *   curr().getKey() <= curr().getValue() < next().getKey()
+ */
+public class IntervalSet<T extends Comparable<T>>
+{
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private static final IntervalSet EMPTY = new IntervalSet(ImmutableSortedMap.of());
+
+    final private NavigableMap<T, T> ranges;
+
+    private IntervalSet(ImmutableSortedMap<T, T> ranges)
+    {
+        this.ranges = ranges;
+    }
+
+    /**
+     * Construct new set containing the interval with the given start and end position.
+     */
+    public IntervalSet(T start, T end)
+    {
+        this(ImmutableSortedMap.of(start, end));
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Comparable<T>> IntervalSet<T> empty()
+    {
+        return (IntervalSet<T>) EMPTY;
+    }
+
+    public boolean contains(T position)
+    {
+        // closed (i.e. inclusive) intervals
+        Map.Entry<T, T> range = ranges.floorEntry(position);
+        return range != null && position.compareTo(range.getValue()) <= 0;
+    }
+
+    public boolean isEmpty()
+    {
+        return ranges.isEmpty();
+    }
+
+    public Optional<T> lowerBound()
+    {
+        return isEmpty() ? Optional.empty() : Optional.of(ranges.firstKey());
+    }
+
+    public Optional<T> upperBound()
+    {
+        return isEmpty() ? Optional.empty() : Optional.of(ranges.lastEntry().getValue());
+    }
+
+    public Collection<T> starts()
+    {
+        return ranges.keySet();
+    }
+
+    public Collection<T> ends()
+    {
+        return ranges.values();
+    }
+
+    public String toString()
+    {
+        return ranges.toString();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return ranges.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        return obj instanceof IntervalSet && ranges.equals(((IntervalSet<?>) obj).ranges);
+    }
+
+    public static final <T extends Comparable<T>> ISerializer<IntervalSet<T>> serializer(ISerializer<T> pointSerializer)
+    {
+        return new ISerializer<IntervalSet<T>>()
+        {
+            public void serialize(IntervalSet<T> intervals, DataOutputPlus out) throws IOException
+            {
+                out.writeInt(intervals.ranges.size());
+                for (Map.Entry<T, T> en : intervals.ranges.entrySet())
+                {
+                    pointSerializer.serialize(en.getKey(), out);
+                    pointSerializer.serialize(en.getValue(), out);
+                }
+            }
+    
+            public IntervalSet<T> deserialize(DataInputPlus in) throws IOException
+            {
+                int count = in.readInt();
+                NavigableMap<T, T> ranges = new TreeMap<>();
+                for (int i = 0; i < count; ++i)
+                    ranges.put(pointSerializer.deserialize(in), pointSerializer.deserialize(in));
+                return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges));
+            }
+    
+            public long serializedSize(IntervalSet<T> intervals)
+            {
+                long size = TypeSizes.sizeof(intervals.ranges.size());
+                for (Map.Entry<T, T> en : intervals.ranges.entrySet())
+                {
+                    size += pointSerializer.serializedSize(en.getKey());
+                    size += pointSerializer.serializedSize(en.getValue());
+                }
+                return size;
+            }
+        };
+    };
+
+    /**
+     * Builder of interval sets, applying the necessary normalization while adding ranges.
+     *
+     * Data is stored as above, as a sorted map from interval starts to the corresponding end, which satisfies
+     *   curr().getKey() <= curr().getValue() < next().getKey()
+     */
+    static public class Builder<T extends Comparable<T>>
+    {
+        final NavigableMap<T, T> ranges;
+
+        public Builder()
+        {
+            this.ranges = new TreeMap<>();
+        }
+
+        public Builder(T start, T end)
+        {
+            this();
+            assert start.compareTo(end) <= 0;
+            ranges.put(start, end);
+        }
+
+        /**
+         * Add an interval to the set and perform normalization.
+         */
+        public void add(T start, T end)
+        {
+            assert start.compareTo(end) <= 0;
+            // extend ourselves to cover any ranges we overlap
+            // record directly preceding our end may extend past us, so take the max of our end and its
+            Map.Entry<T, T> extend = ranges.floorEntry(end);
+            if (extend != null && extend.getValue().compareTo(end) > 0)
+                end = extend.getValue();
+
+            // record directly preceding our start may extend into us; if it does, we take it as our start
+            extend = ranges.lowerEntry(start);
+            if (extend != null && extend.getValue().compareTo(start) >= 0)
+                start = extend.getKey();
+
+            // remove all covered intervals
+            // since we have adjusted start and end to cover the ones that would be only partially covered, we
+            // are certain that anything whose start falls within the span is completely covered
+            ranges.subMap(start, end).clear();
+            // add the new interval
+            ranges.put(start, end);
+        }
+
+        public void addAll(IntervalSet<T> otherSet)
+        {
+            for (Map.Entry<T, T> en : otherSet.ranges.entrySet())
+            {
+                add(en.getKey(), en.getValue());
+            }
+        }
+
+        public IntervalSet<T> build()
+        {
+            return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 0b21763..b0214b8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -18,15 +18,9 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
@@ -43,71 +37,6 @@ public class ReplayPosition implements Comparable<ReplayPosition>
     public final long segment;
     public final int position;
 
-    /**
-     * A filter of known safe-to-discard commit log replay positions, based on
-     * the range covered by on disk sstables and those prior to the most recent truncation record
-     */
-    public static class ReplayFilter
-    {
-        final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
-        public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
-        {
-            for (SSTableReader reader : onDisk)
-            {
-                ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
-                ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
-                add(persisted, start, end);
-            }
-            if (truncatedAt != null)
-                add(persisted, ReplayPosition.NONE, truncatedAt);
-        }
-
-        private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
-        {
-            // extend ourselves to cover any ranges we overlap
-            // record directly preceding our end may extend past us, so take the max of our end and its
-            Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
-            if (extend != null && extend.getValue().compareTo(end) > 0)
-                end = extend.getValue();
-
-            // record directly preceding our start may extend into us; if it does, we take it as our start
-            extend = ranges.lowerEntry(start);
-            if (extend != null && extend.getValue().compareTo(start) >= 0)
-                start = extend.getKey();
-
-            ranges.subMap(start, end).clear();
-            ranges.put(start, end);
-        }
-
-        public boolean shouldReplay(ReplayPosition position)
-        {
-            // replay ranges are start exclusive, end inclusive
-            Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
-            return range == null || position.compareTo(range.getValue()) > 0;
-        }
-
-        public boolean isEmpty()
-        {
-            return persisted.isEmpty();
-        }
-    }
-
-    public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
-    {
-        ReplayPosition min = null;
-        for (ReplayFilter map : ranges)
-        {
-            ReplayPosition first = map.persisted.firstEntry().getValue();
-            if (min == null)
-                min = first;
-            else
-                min = Ordering.natural().min(min, first);
-        }
-        if (min == null)
-            return NONE;
-        return min;
-    }
-
     public ReplayPosition(long segment, int position)
     {
         this.segment = segment;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 0dce52b..a80a6f4 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -241,6 +241,9 @@ public abstract class AbstractCompactionStrategy
      */
     public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
+            CompactionManager.instance.submitBackground(cfs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 444d43d..a9bfbd2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -189,6 +189,9 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
+            CompactionManager.instance.submitBackground(cfs);
     }
 
     public int getUnleveledSSTables()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c94b88f..5a3d524 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -48,8 +47,6 @@ import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static com.google.common.base.Predicates.and;
-import static com.google.common.base.Predicates.in;
-import static com.google.common.base.Predicates.not;
 import static com.google.common.collect.ImmutableSet.copyOf;
 import static com.google.common.collect.Iterables.filter;
 import static java.util.Collections.singleton;
@@ -204,7 +201,6 @@ public class Tracker
                          ImmutableList.<Memtable>of(),
                          Collections.<SSTableReader, SSTableReader>emptyMap(),
                          Collections.<SSTableReader, SSTableReader>emptyMap(),
-                         Collections.<SSTableReader>emptySet(),
                          SSTableIntervalTree.empty()));
     }
 
@@ -351,49 +347,19 @@ public class Tracker
 
         Throwable fail;
         fail = updateSizeTracking(emptySet(), sstables, null);
+        // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+        fail = notifyAdded(sstables, fail);
+
+        if (!isDummy() && !cfstore.isValid())
+            dropSSTables();
 
         maybeFail(fail);
     }
 
-    /**
-     * permit compaction of the provided sstable; this translates to notifying compaction
-     * strategies of its existence, and potentially submitting a background task
-     */
-    public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
-    {
-        if (sstables.isEmpty())
-            return;
-
-        apply(View.permitCompactionOfFlushed(sstables));
-
-        if (isDummy())
-            return;
-
-        if (cfstore.isValid())
-        {
-            notifyAdded(sstables);
-            CompactionManager.instance.submitBackground(cfstore);
-        }
-        else
-        {
-            dropSSTables();
-        }
-    }
 
 
     // MISCELLANEOUS public utility calls
 
-    public Set<SSTableReader> getSSTables()
-    {
-        return view.get().sstables;
-    }
-
-    public Iterable<SSTableReader> getPermittedToCompact()
-    {
-        View view = this.view.get();
-        return filter(view.sstables, not(in(view.premature)));
-    }
-
     public Set<SSTableReader> getCompacting()
     {
         return view.get().compacting;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 3fa197f..4b3aae0 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Interval;
 
 import static com.google.common.base.Predicates.equalTo;
-import static com.google.common.base.Predicates.in;
 import static com.google.common.base.Predicates.not;
 import static com.google.common.collect.ImmutableList.copyOf;
 import static com.google.common.collect.ImmutableList.of;
@@ -70,7 +69,6 @@ public class View
     public final List<Memtable> flushingMemtables;
     final Set<SSTableReader> compacting;
     final Set<SSTableReader> sstables;
-    final Set<SSTableReader> premature;
     // we use a Map here so that we can easily perform identity checks as well as equality checks.
     // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
     // and we then check that not only are they all present in the live set, but that the exact instance present is
@@ -80,7 +78,7 @@ public class View
 
     final SSTableIntervalTree intervalTree;
 
-    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
+    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
     {
         assert liveMemtables != null;
         assert flushingMemtables != null;
@@ -95,7 +93,6 @@ public class View
         this.sstables = sstablesMap.keySet();
         this.compactingMap = compacting;
         this.compacting = compactingMap.keySet();
-        this.premature = premature;
         this.intervalTree = intervalTree;
     }
 
@@ -256,7 +253,7 @@ public class View
                 assert all(mark, Helpers.idIn(view.sstablesMap));
                 return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
                                 replace(view.compactingMap, unmark, mark),
-                                view.premature, view.intervalTree);
+                                view.intervalTree);
             }
         };
     }
@@ -270,7 +267,7 @@ public class View
             public boolean apply(View view)
             {
                 for (SSTableReader reader : readers)
-                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader))
+                    if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
                         return false;
                 return true;
             }
@@ -287,7 +284,7 @@ public class View
             public View apply(View view)
             {
                 Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
-                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature,
+                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }
         };
@@ -302,7 +299,7 @@ public class View
             {
                 List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
                 assert newLive.size() == view.liveMemtables.size() + 1;
-                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
+                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
             }
         };
     }
@@ -321,7 +318,7 @@ public class View
                                                            filter(flushing, not(lessThan(toFlush)))));
                 assert newLive.size() == live.size() - 1;
                 assert newFlushing.size() == flushing.size() + 1;
-                return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
+                return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
             }
         };
     }
@@ -338,32 +335,15 @@ public class View
 
                 if (flushed == null || flushed.isEmpty())
                     return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
-                                    view.compactingMap, view.premature, view.intervalTree);
+                                    view.compactingMap, view.intervalTree);
 
                 Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
-                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
-                Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
-                return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
+                return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }
         };
     }
 
-    static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers)
-    {
-        Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers);
-        return new Function<View, View>()
-        {
-            public View apply(View view)
-            {
-                Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet());
-                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet());
-                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree);
-            }
-        };
-    }
-
-
     private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
     {
         return new Predicate<T>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index d9e289c..96c5a6e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -72,6 +72,8 @@ public abstract class Version
 
     public abstract boolean hasCommitLogLowerBound();
 
+    public abstract boolean hasCommitLogIntervals();
+
     public String getVersion()
     {
         return version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index e0fb3b1..16f0beb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -111,7 +111,7 @@ public class BigFormat implements SSTableFormat
     // we always incremented the major version.
     static class BigVersion extends Version
     {
-        public static final String current_version = "mb";
+        public static final String current_version = "mc";
         public static final String earliest_supported_version = "jb";
 
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@ -124,7 +124,8 @@ public class BigFormat implements SSTableFormat
         // lb (2.2.7): commit log lower bound included
         // ma (3.0.0): swap bf hash order
         //             store rows natively
-        // mb (3.0.6): commit log lower bound included
+        // mb (3.0.7, 3.7): commit log lower bound included
+        // mc (3.0.8, 3.9): commit log intervals included
         //
         // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
 
@@ -145,6 +146,7 @@ public class BigFormat implements SSTableFormat
          */
         private final boolean hasOldBfHashOrder;
         private final boolean hasCommitLogLowerBound;
+        private final boolean hasCommitLogIntervals;
 
         /**
          * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
@@ -186,6 +188,7 @@ public class BigFormat implements SSTableFormat
             hasBoundaries = version.compareTo("ma") < 0;
             hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0)
                                      || version.compareTo("mb") >= 0;
+            hasCommitLogIntervals = version.compareTo("mc") >= 0;
         }
 
         @Override
@@ -248,12 +251,19 @@ public class BigFormat implements SSTableFormat
             return newFileName;
         }
 
+        @Override
         public boolean hasCommitLogLowerBound()
         {
             return hasCommitLogLowerBound;
         }
 
         @Override
+        public boolean hasCommitLogIntervals()
+        {
+            return hasCommitLogIntervals;
+        }
+
+        @Override
         public boolean storeRows()
         {
             return storeRows;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4561520..a683513 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -24,6 +24,7 @@ import java.util.*;
 import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -35,6 +36,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.StreamingHistogram;
 
+import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer;
+
 /**
  * Serializer for SSTable from legacy versions
  */
@@ -55,7 +58,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
 
         EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
         EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
-        ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
+        ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
         out.writeLong(stats.minTimestamp);
         out.writeLong(stats.maxTimestamp);
         out.writeInt(stats.maxLocalDeletionTime);
@@ -72,7 +75,9 @@ public class LegacyMetadataSerializer extends MetadataSerializer
         for (ByteBuffer value : stats.maxClusteringValues)
             ByteBufferUtil.writeWithShortLength(value, out);
         if (version.hasCommitLogLowerBound())
-            ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
+            ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
+        if (version.hasCommitLogIntervals())
+            replayPositionSetSerializer.serialize(stats.commitLogIntervals, out);
     }
 
     /**
@@ -121,6 +126,11 @@ public class LegacyMetadataSerializer extends MetadataSerializer
 
                 if (descriptor.version.hasCommitLogLowerBound())
                     commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+                IntervalSet<ReplayPosition> commitLogIntervals;
+                if (descriptor.version.hasCommitLogIntervals())
+                    commitLogIntervals = replayPositionSetSerializer.deserialize(in);
+                else
+                    commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound);
 
                 if (types.contains(MetadataType.VALIDATION))
                     components.put(MetadataType.VALIDATION,
@@ -129,8 +139,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                     components.put(MetadataType.STATS,
                                    new StatsMetadata(partitionSizes,
                                                      columnCounts,
-                                                     commitLogLowerBound,
-                                                     commitLogUpperBound,
+                                                     commitLogIntervals,
                                                      minTimestamp,
                                                      maxTimestamp,
                                                      Integer.MAX_VALUE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 53cf0b0..1ff2ca8 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -27,12 +27,11 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.db.rows.Cell;
@@ -69,8 +68,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         return new StatsMetadata(defaultPartitionSizeHistogram(),
                                  defaultCellPerPartitionCountHistogram(),
-                                 ReplayPosition.NONE,
-                                 ReplayPosition.NONE,
+                                 IntervalSet.empty(),
                                  Long.MIN_VALUE,
                                  Long.MAX_VALUE,
                                  Integer.MAX_VALUE,
@@ -91,8 +89,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
     // TODO: cound the number of row per partition (either with the number of cells, or instead)
     protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
-    protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
-    protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
+    protected IntervalSet commitLogIntervals = IntervalSet.empty();
     protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
     protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
     protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@ -126,23 +123,13 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         this(comparator);
 
-        ReplayPosition min = null, max = null;
+        IntervalSet.Builder intervals = new IntervalSet.Builder();
         for (SSTableReader sstable : sstables)
         {
-            if (min == null)
-            {
-                min = sstable.getSSTableMetadata().commitLogLowerBound;
-                max = sstable.getSSTableMetadata().commitLogUpperBound;
-            }
-            else
-            {
-                min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
-                max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
-            }
+            intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals);
         }
 
-        commitLogLowerBound(min);
-        commitLogUpperBound(max);
+        commitLogIntervals(intervals.build());
         sstableLevel(level);
     }
 
@@ -229,15 +216,9 @@ public class MetadataCollector implements PartitionStatisticsCollector
         ttlTracker.update(newTTL);
     }
 
-    public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
-    {
-        this.commitLogLowerBound = commitLogLowerBound;
-        return this;
-    }
-
-    public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
+    public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals)
     {
-        this.commitLogUpperBound = commitLogUpperBound;
+        this.commitLogIntervals = commitLogIntervals;
         return this;
     }
 
@@ -302,8 +283,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
         components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
                                                              estimatedCellPerPartitionCount,
-                                                             commitLogLowerBound,
-                                                             commitLogUpperBound,
+                                                             commitLogIntervals,
                                                              timestampTracker.min(),
                                                              timestampTracker.max(),
                                                              localDeletionTimeTracker.min(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 07e35bb..9971eaa 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -22,10 +22,12 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -39,11 +41,11 @@ import org.apache.cassandra.utils.StreamingHistogram;
 public class StatsMetadata extends MetadataComponent
 {
     public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
+    public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer);
 
     public final EstimatedHistogram estimatedPartitionSize;
     public final EstimatedHistogram estimatedColumnCount;
-    public final ReplayPosition commitLogLowerBound;
-    public final ReplayPosition commitLogUpperBound;
+    public final IntervalSet<ReplayPosition> commitLogIntervals;
     public final long minTimestamp;
     public final long maxTimestamp;
     public final int minLocalDeletionTime;
@@ -62,8 +64,7 @@ public class StatsMetadata extends MetadataComponent
 
     public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
-                         ReplayPosition commitLogLowerBound,
-                         ReplayPosition commitLogUpperBound,
+                         IntervalSet<ReplayPosition> commitLogIntervals,
                          long minTimestamp,
                          long maxTimestamp,
                          int minLocalDeletionTime,
@@ -82,8 +83,7 @@ public class StatsMetadata extends MetadataComponent
     {
         this.estimatedPartitionSize = estimatedPartitionSize;
         this.estimatedColumnCount = estimatedColumnCount;
-        this.commitLogLowerBound = commitLogLowerBound;
-        this.commitLogUpperBound = commitLogUpperBound;
+        this.commitLogIntervals = commitLogIntervals;
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
         this.minLocalDeletionTime = minLocalDeletionTime;
@@ -134,8 +134,7 @@ public class StatsMetadata extends MetadataComponent
     {
         return new StatsMetadata(estimatedPartitionSize,
                                  estimatedColumnCount,
-                                 commitLogLowerBound,
-                                 commitLogUpperBound,
+                                 commitLogIntervals,
                                  minTimestamp,
                                  maxTimestamp,
                                  minLocalDeletionTime,
@@ -157,8 +156,7 @@ public class StatsMetadata extends MetadataComponent
     {
         return new StatsMetadata(estimatedPartitionSize,
                                  estimatedColumnCount,
-                                 commitLogLowerBound,
-                                 commitLogUpperBound,
+                                 commitLogIntervals,
                                  minTimestamp,
                                  maxTimestamp,
                                  minLocalDeletionTime,
@@ -186,8 +184,7 @@ public class StatsMetadata extends MetadataComponent
         return new EqualsBuilder()
                        .append(estimatedPartitionSize, that.estimatedPartitionSize)
                        .append(estimatedColumnCount, that.estimatedColumnCount)
-                       .append(commitLogLowerBound, that.commitLogLowerBound)
-                       .append(commitLogUpperBound, that.commitLogUpperBound)
+                       .append(commitLogIntervals, that.commitLogIntervals)
                        .append(minTimestamp, that.minTimestamp)
                        .append(maxTimestamp, that.maxTimestamp)
                        .append(minLocalDeletionTime, that.minLocalDeletionTime)
@@ -212,8 +209,7 @@ public class StatsMetadata extends MetadataComponent
         return new HashCodeBuilder()
                        .append(estimatedPartitionSize)
                        .append(estimatedColumnCount)
-                       .append(commitLogLowerBound)
-                       .append(commitLogUpperBound)
+                       .append(commitLogIntervals)
                        .append(minTimestamp)
                        .append(maxTimestamp)
                        .append(minLocalDeletionTime)
@@ -239,7 +235,7 @@ public class StatsMetadata extends MetadataComponent
             int size = 0;
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
-            size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
+            size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE));
             if (version.storeRows())
                 size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
             else
@@ -258,7 +254,9 @@ public class StatsMetadata extends MetadataComponent
             if (version.storeRows())
                 size += 8 + 8; // totalColumnsSet, totalRows
             if (version.hasCommitLogLowerBound())
-                size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
+                size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE));
+            if (version.hasCommitLogIntervals())
+                size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals);
             return size;
         }
 
@@ -266,7 +264,7 @@ public class StatsMetadata extends MetadataComponent
         {
             EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
             EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
+            ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
             out.writeLong(component.minTimestamp);
             out.writeLong(component.maxTimestamp);
             if (version.storeRows())
@@ -296,7 +294,9 @@ public class StatsMetadata extends MetadataComponent
             }
 
             if (version.hasCommitLogLowerBound())
-                ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
+                ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
+            if (version.hasCommitLogIntervals())
+                replayPositionSetSerializer.serialize(component.commitLogIntervals, out);
         }
 
         public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -338,11 +338,15 @@ public class StatsMetadata extends MetadataComponent
 
             if (version.hasCommitLogLowerBound())
                 commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+            IntervalSet<ReplayPosition> commitLogIntervals;
+            if (version.hasCommitLogIntervals())
+                commitLogIntervals = replayPositionSetSerializer.deserialize(in);
+            else
+                commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound);
 
             return new StatsMetadata(partitionSizes,
                                      columnCounts,
-                                     commitLogLowerBound,
-                                     commitLogUpperBound,
+                                     commitLogIntervals,
                                      minTimestamp,
                                      maxTimestamp,
                                      minLocalDeletionTime,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 420b802..5f7513f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -70,8 +70,7 @@ public class SSTableMetadataViewer
                     out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
                     out.printf("SSTable Level: %d%n", stats.sstableLevel);
                     out.printf("Repaired at: %d%n", stats.repairedAt);
-                    out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
-                    out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+                    out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
                     out.println("Estimated tombstone drop times:");
                     for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                     {