You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:23 UTC

[18/20] cassandra git commit: Merge commit '78a3d2bba95b9efcda152a157f822f4970f22636' into cassandra-3.7

Merge commit '78a3d2bba95b9efcda152a157f822f4970f22636' into cassandra-3.7


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ac0506
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ac0506
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ac0506

Branch: refs/heads/cassandra-3.7
Commit: c9ac0506c41667ebe9b59acb73d6329401461289
Parents: 902877a 78a3d2b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 12 15:19:52 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 12 15:20:41 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 283 ++++++++++++-------
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  70 +++--
 .../db/commitlog/CommitLogReplayer.java         |  59 ++--
 .../cassandra/db/commitlog/ReplayPosition.java  |  93 ++++--
 .../compaction/AbstractCompactionStrategy.java  |   3 -
 .../compaction/CompactionStrategyManager.java   |   3 -
 .../apache/cassandra/db/lifecycle/Tracker.java  |  53 +++-
 .../org/apache/cassandra/db/lifecycle/View.java |  39 ++-
 .../apache/cassandra/db/view/TableViews.java    |   4 +-
 .../io/sstable/format/SSTableReader.java        |   5 -
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../io/sstable/metadata/CompactionMetadata.java |  11 +-
 .../metadata/LegacyMetadataSerializer.java      |  13 +-
 .../io/sstable/metadata/MetadataCollector.java  |  36 ++-
 .../io/sstable/metadata/StatsMetadata.java      |  39 ++-
 .../cassandra/tools/SSTableMetadataViewer.java  |   3 +-
 .../legacy_mb_clust/mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
 .../legacy_mb_clust/mb-1-big-Data.db            | Bin 0 -> 5342 bytes
 .../legacy_mb_clust/mb-1-big-Digest.crc32       |   1 +
 .../legacy_mb_clust/mb-1-big-Filter.db          | Bin 0 -> 24 bytes
 .../legacy_mb_clust/mb-1-big-Index.db           | Bin 0 -> 157553 bytes
 .../legacy_mb_clust/mb-1-big-Statistics.db      | Bin 0 -> 7058 bytes
 .../legacy_mb_clust/mb-1-big-Summary.db         | Bin 0 -> 47 bytes
 .../legacy_mb_clust/mb-1-big-TOC.txt            |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_mb_clust_compact/mb-1-big-Data.db    | Bin 0 -> 5383 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_clust_compact/mb-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mb_clust_compact/mb-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7058 bytes
 .../legacy_mb_clust_compact/mb-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mb_clust_compact/mb-1-big-TOC.txt    |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../legacy_mb_clust_counter/mb-1-big-Data.db    | Bin 0 -> 4625 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_clust_counter/mb-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_mb_clust_counter/mb-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7067 bytes
 .../legacy_mb_clust_counter/mb-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_mb_clust_counter/mb-1-big-TOC.txt    |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../mb-1-big-Data.db                            | Bin 0 -> 4639 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../mb-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mb-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 7067 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mb-1-big-TOC.txt                            |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple/mb-1-big-Data.db           | Bin 0 -> 91 bytes
 .../legacy_mb_simple/mb-1-big-Digest.crc32      |   1 +
 .../legacy_mb_simple/mb-1-big-Filter.db         | Bin 0 -> 24 bytes
 .../legacy_mb_simple/mb-1-big-Index.db          | Bin 0 -> 26 bytes
 .../legacy_mb_simple/mb-1-big-Statistics.db     | Bin 0 -> 4611 bytes
 .../legacy_mb_simple/mb-1-big-Summary.db        | Bin 0 -> 47 bytes
 .../legacy_mb_simple/mb-1-big-TOC.txt           |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple_compact/mb-1-big-Data.db   | Bin 0 -> 91 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_simple_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mb_simple_compact/mb-1-big-Index.db  | Bin 0 -> 26 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4652 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mb_simple_compact/mb-1-big-TOC.txt   |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_mb_simple_counter/mb-1-big-Data.db   | Bin 0 -> 115 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../legacy_mb_simple_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_mb_simple_counter/mb-1-big-Index.db  | Bin 0 -> 27 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4620 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_mb_simple_counter/mb-1-big-TOC.txt   |   8 +
 .../mb-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../mb-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../mb-1-big-Digest.crc32                       |   1 +
 .../mb-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../mb-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../mb-1-big-Statistics.db                      | Bin 0 -> 4661 bytes
 .../mb-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../mb-1-big-TOC.txt                            |   8 +
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../cassandra/db/RecoveryManagerTest.java       |   2 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   4 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  30 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../CompressedRandomAccessReaderTest.java       |   4 +-
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../metadata/MetadataSerializerTest.java        |  83 +++++-
 .../cassandra/io/util/MmappedRegionsTest.java   |   3 +-
 94 files changed, 644 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d138389,677ea11..fb8d272
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -3,78 -2,9 +3,79 @@@ Merged from 3.0
   * Refactor Materialized View code (CASSANDRA-11475)
   * Update Java Driver (CASSANDRA-11615)
  Merged from 2.2:
++ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
   * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
  
 -3.0.6
 +3.6
 + * Allow server startup if JMX is configured directly (CASSANDRA-11725)
 + * Prevent direct memory OOM on buffer pool allocations (CASSANDRA-11710)
 + * Enhanced Compaction Logging (CASSANDRA-10805)
 + * Make prepared statement cache size configurable (CASSANDRA-11555)
 + * Integrated JMX authentication and authorization (CASSANDRA-10091)
 + * Add units to stress ouput (CASSANDRA-11352)
 + * Fix PER PARTITION LIMIT for single and multi partitions queries (CASSANDRA-11603)
 + * Add uncompressed chunk cache for RandomAccessReader (CASSANDRA-5863)
 + * Clarify ClusteringPrefix hierarchy (CASSANDRA-11213)
 + * Always perform collision check before joining ring (CASSANDRA-10134)
 + * SSTableWriter output discrepancy (CASSANDRA-11646)
 + * Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756)
 + * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)
 + * Add support to rebuild from specific range (CASSANDRA-10406)
 + * Optimize the overlapping lookup by calculating all the
 +   bounds in advance (CASSANDRA-11571)
 + * Support json/yaml output in noetool tablestats (CASSANDRA-5977)
 + * (stress) Add datacenter option to -node options (CASSANDRA-11591)
 + * Fix handling of empty slices (CASSANDRA-11513)
 + * Make number of cores used by cqlsh COPY visible to testing code (CASSANDRA-11437)
 + * Allow filtering on clustering columns for queries without secondary indexes (CASSANDRA-11310)
 + * Refactor Restriction hierarchy (CASSANDRA-11354)
 + * Eliminate allocations in R/W path (CASSANDRA-11421)
 + * Update Netty to 4.0.36 (CASSANDRA-11567)
 + * Fix PER PARTITION LIMIT for queries requiring post-query ordering (CASSANDRA-11556)
 + * Allow instantiation of UDTs and tuples in UDFs (CASSANDRA-10818)
 + * Support UDT in CQLSSTableWriter (CASSANDRA-10624)
 + * Support for non-frozen user-defined types, updating
 +   individual fields of user-defined types (CASSANDRA-7423)
 + * Make LZ4 compression level configurable (CASSANDRA-11051)
 + * Allow per-partition LIMIT clause in CQL (CASSANDRA-7017)
 + * Make custom filtering more extensible with UserExpression (CASSANDRA-11295)
 + * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649)
 + * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507)
 + * More user friendly error when providing an invalid token to nodetool (CASSANDRA-9348)
 + * Add static column support to SASI index (CASSANDRA-11183)
 + * Support EQ/PREFIX queries in SASI CONTAINS mode without tokenization (CASSANDRA-11434)
 + * Support LIKE operator in prepared statements (CASSANDRA-11456)
 + * Add a command to see if a Materialized View has finished building (CASSANDRA-9967)
 + * Log endpoint and port associated with streaming operation (CASSANDRA-8777)
 + * Print sensible units for all log messages (CASSANDRA-9692)
 + * Upgrade Netty to version 4.0.34 (CASSANDRA-11096)
 + * Break the CQL grammar into separate Parser and Lexer (CASSANDRA-11372)
 + * Compress only inter-dc traffic by default (CASSANDRA-8888)
 + * Add metrics to track write amplification (CASSANDRA-11420)
 + * cassandra-stress: cannot handle "value-less" tables (CASSANDRA-7739)
 + * Add/drop multiple columns in one ALTER TABLE statement (CASSANDRA-10411)
 + * Add require_endpoint_verification opt for internode encryption (CASSANDRA-9220)
 + * Add auto import java.util for UDF code block (CASSANDRA-11392)
 + * Add --hex-format option to nodetool getsstables (CASSANDRA-11337)
 + * sstablemetadata should print sstable min/max token (CASSANDRA-7159)
 + * Do not wrap CassandraException in TriggerExecutor (CASSANDRA-9421)
 + * COPY TO should have higher double precision (CASSANDRA-11255)
 + * Stress should exit with non-zero status after failure (CASSANDRA-10340)
 + * Add client to cqlsh SHOW_SESSION (CASSANDRA-8958)
 + * Fix nodetool tablestats keyspace level metrics (CASSANDRA-11226)
 + * Store repair options in parent_repair_history (CASSANDRA-11244)
 + * Print current leveling in sstableofflinerelevel (CASSANDRA-9588)
 + * Change repair message for keyspaces with RF 1 (CASSANDRA-11203)
 + * Remove hard-coded SSL cipher suites and protocols (CASSANDRA-10508)
 + * Improve concurrency in CompactionStrategyManager (CASSANDRA-10099)
 + * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
 + * Refuse to start and print txn log information in case of disk
 +   corruption (CASSANDRA-10112)
 + * Resolve some eclipse-warnings (CASSANDRA-11086)
 + * (cqlsh) Show static columns in a different color (CASSANDRA-11059)
 + * Allow to remove TTLs on table with default_time_to_live (CASSANDRA-11207)
 +Merged from 3.0:
   * Disallow creating view with a static column (CASSANDRA-11602)
   * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593)
   * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fffd87f,e9a2938..8dedd23
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -920,10 -903,13 +931,13 @@@ public class ColumnFamilyStore implemen
          final boolean flushSecondaryIndexes;
          final OpOrder.Barrier writeBarrier;
          final CountDownLatch latch = new CountDownLatch(1);
-         final ReplayPosition lastReplayPosition;
 -        volatile FSWriteError flushFailure = null;
 +        volatile Throwable flushFailure = null;
+         final ReplayPosition commitLogUpperBound;
+         final List<Memtable> memtables;
+         final List<Collection<SSTableReader>> readers;
  
-         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
+                           List<Memtable> memtables, List<Collection<SSTableReader>> readers)
          {
              this.writeBarrier = writeBarrier;
              this.flushSecondaryIndexes = flushSecondaryIndexes;
@@@ -966,7 -962,9 +990,9 @@@
              metric.pendingFlushes.dec();
  
              if (flushFailure != null)
 -                throw flushFailure;
 +                throw Throwables.propagate(flushFailure);
+ 
+             return commitLogUpperBound;
          }
      }
  
@@@ -1052,7 -1041,8 +1069,7 @@@
                  memtable.cfs.data.markFlushing(memtable);
                  if (memtable.isClean() || truncate)
                  {
-                     memtable.cfs.replaceFlushed(memtable, null);
+                     memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
 -                    memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
                      reclaim(memtable);
                      iter.remove();
                  }
@@@ -1070,108 -1054,23 +1081,109 @@@
              {
                  for (Memtable memtable : memtables)
                  {
-                     flushMemtable(memtable);
 -                    Collection<SSTableReader> readers = memtable.flush();
 -                    memtable.cfs.data.replaceFlushed(memtable, readers);
 -                    reclaim(memtable);
 -                    this.readers.add(readers);
++                    this.readers.add(flushMemtable(memtable));
                  }
              }
 -            catch (FSWriteError e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
              postFlush.latch.countDown();
          }
  
-         public void flushMemtable(Memtable memtable)
++        public Collection<SSTableReader> flushMemtable(Memtable memtable)
 +        {
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
 +            {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
 +                try
 +                {
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
 +                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
 +                }
 +                catch (Throwable t)
 +                {
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
 +                }
 +
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
 +
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
-             memtable.cfs.replaceFlushed(memtable, sstables);
++            memtable.cfs.data.replaceFlushed(memtable, sstables);
 +            reclaim(memtable);
 +            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
++            return sstables;
 +        }
 +
          private void reclaim(final Memtable memtable)
          {
              // issue a read barrier for reclaiming the memory, and offload the wait to another thread

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 6fe89a9,93dc5af..27c5372
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -34,7 -34,7 +34,6 @@@ import org.apache.cassandra.config.Colu
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.commitlog.ReplayPosition;
--import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.filter.ClusteringIndexFilter;
  import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -44,19 -44,16 +43,15 @@@ import org.apache.cassandra.db.rows.Unf
  import org.apache.cassandra.dht.*;
  import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
  import org.apache.cassandra.index.transactions.UpdateTransaction;
- import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
- import org.apache.cassandra.io.sstable.SSTableTxnWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 -import org.apache.cassandra.io.util.DiskAwareRunnable;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.ObjectSizes;
 +import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.concurrent.OpOrder;
- import org.apache.cassandra.utils.memory.HeapAllocator;
  import org.apache.cassandra.utils.memory.MemtableAllocator;
  import org.apache.cassandra.utils.memory.MemtablePool;
  
@@@ -257,49 -263,6 +261,48 @@@ public class Memtable implements Compar
          return partitions.size();
      }
  
 +    public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
 +    {
 +        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
 +
 +        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
-             return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
++            return Collections.singletonList(new FlushRunnable(txn));
 +
 +        return createFlushRunnables(localRanges, txn);
 +    }
 +
 +    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
 +    {
 +        assert cfs.getPartitioner().splitter().isPresent();
 +
 +        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 +        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
 +        List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
 +        PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
-         ReplayPosition context = lastReplayPosition.get();
 +        try
 +        {
 +            for (int i = 0; i < boundaries.size(); i++)
 +            {
 +                PartitionPosition t = boundaries.get(i);
-                 runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
++                runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
 +                rangeStart = t;
 +            }
 +            return runnables;
 +        }
 +        catch (Throwable e)
 +        {
 +            throw Throwables.propagate(abortRunnables(runnables, e));
 +        }
 +    }
 +
 +    public Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t)
 +    {
 +        if (runnables != null)
 +            for (FlushRunnable runnable : runnables)
 +                t = runnable.writer.abort(t);
 +        return t;
 +    }
 +
      public String toString()
      {
          return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@@ -352,75 -315,46 +355,68 @@@
          return partitions.get(key);
      }
  
-     public long creationTime()
 -    public Collection<SSTableReader> flush()
--    {
-         return creationTime;
 -        long estimatedSize = estimatedSize();
 -        Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
 -        if (dataDirectory == null)
 -            throw new RuntimeException("Insufficient disk space to write " + estimatedSize + " bytes");
 -        File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
 -        assert sstableDirectory != null : "Flush task is not bound to any disk";
 -        return writeSortedContents(sstableDirectory);
--    }
--
      public long getMinTimestamp()
      {
          return minTimestamp;
      }
  
 -    private long estimatedSize()
 +    class FlushRunnable implements Callable<SSTableMultiWriter>
      {
-         public final ReplayPosition context;
 -        long keySize = 0;
 -        for (PartitionPosition key : partitions.keySet())
 +        private final long estimatedSize;
 +        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
 +
 +        private final boolean isBatchLogTable;
 +        private final SSTableMultiWriter writer;
 +
 +        // keeping these to be able to log what we are actually flushing
 +        private final PartitionPosition from;
 +        private final PartitionPosition to;
 +
-         FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
++        FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
          {
-             this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
 -            //  make sure we don't write non-sensical keys
 -            assert key instanceof DecoratedKey;
 -            keySize += ((DecoratedKey)key).getKey().remaining();
++            this(partitions.subMap(from, to), flushLocation, from, to, txn);
          }
 -        return (long) ((keySize // index entries
 -                        + keySize // keys in data file
 -                        + liveDataSize.get()) // data
 -                       * 1.2); // bloom filter and row index overhead
 -    }
  
-         FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
 -    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
 -    {
 -        boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
++        FlushRunnable(LifecycleTransaction txn)
 +        {
-             this(context, partitions, null, null, null, txn);
++            this(partitions, null, null, null, txn);
 +        }
 +
-         FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
++        FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
 +        {
-             this.context = context;
 +            this.toFlush = toFlush;
 +            this.from = from;
 +            this.to = to;
 +            long keySize = 0;
 +            for (PartitionPosition key : toFlush.keySet())
 +            {
 +                //  make sure we don't write non-sensical keys
 +                assert key instanceof DecoratedKey;
 +                keySize += ((DecoratedKey) key).getKey().remaining();
 +            }
 +            estimatedSize = (long) ((keySize // index entries
 +                                    + keySize // keys in data file
 +                                    + liveDataSize.get()) // data
 +                                    * 1.2); // bloom filter and row index overhead
 +
 +            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
  
 -        logger.debug("Writing {}", Memtable.this.toString());
 +            if (flushLocation == null)
 +                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
 +            else
 +                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
 +
 +        }
  
 -        Collection<SSTableReader> ssTables;
 -        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
 +        protected Directories getDirectories()
          {
 +            return cfs.getDirectories();
 +        }
 +
-         private void writeSortedContents(ReplayPosition context)
++        private void writeSortedContents()
 +        {
 +            logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
 +
              boolean trackContention = logger.isTraceEnabled();
              int heavilyContendedRowCount = 0;
              // (we can't clear out the map as-we-go to free up memory,
@@@ -447,37 -381,59 +443,39 @@@
                  }
              }
  
 -            if (writer.getFilePointer() > 0)
 -            {
 -                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
 -                                           writer.getFilename(),
 -                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
 -                                           commitLogUpperBound));
 -
 -                // sstables should contain non-repaired data.
 -                ssTables = writer.finish(true);
 -            }
 -            else
 -            {
 -                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
 -                             writer.getFilename(), commitLogUpperBound);
 -                writer.abort();
 -                ssTables = Collections.emptyList();
 -            }
 +            long bytesFlushed = writer.getFilePointer();
 +            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
 +                                                                              writer.getFilename(),
 +                                                                              FBUtilities.prettyPrintMemory(bytesFlushed),
-                                                                               context));
++                                                                              commitLogUpperBound));
 +            // Update the metrics
 +            cfs.metric.bytesFlushed.inc(bytesFlushed);
  
              if (heavilyContendedRowCount > 0)
 -                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 -
 -            return ssTables;
 +                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
          }
 -    }
  
 -    @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
 -    public SSTableTxnWriter createFlushWriter(String filename,
 -                                              PartitionColumns columns,
 -                                              EncodingStats stats)
 -    {
 -        // we operate "offline" here, as we expose the resulting reader consciously when done
 -        // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
 -        LifecycleTransaction txn = null;
 -        try
 +        public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
 +                                                  String filename,
 +                                                  PartitionColumns columns,
 +                                                  EncodingStats stats)
          {
-             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
 -            txn = LifecycleTransaction.offline(OperationType.FLUSH);
+             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
 -                                                         .commitLogLowerBound(commitLogLowerBound.get())
 -                                                         .commitLogUpperBound(commitLogUpperBound.get());
 -
 -            return new SSTableTxnWriter(txn,
 -                                        cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
 -                                                                     (long) partitions.size(),
 -                                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
 -                                                                     sstableMetadataCollector,
 -                                                                     new SerializationHeader(true, cfs.metadata, columns, stats),
 -                                                                     txn));
++                    .commitLogLowerBound(commitLogLowerBound.get())
++                    .commitLogUpperBound(commitLogUpperBound.get());
 +            return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
 +                                                (long)toFlush.size(),
 +                                                ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                                sstableMetadataCollector,
 +                                                new SerializationHeader(true, cfs.metadata, columns, stats), txn);
 +
          }
 -        catch (Throwable t)
 +
 +        @Override
 +        public SSTableMultiWriter call()
          {
-             writeSortedContents(context);
 -            if (txn != null)
 -                txn.close();
 -            throw t;
++            writeSortedContents();
 +            return writer;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index e00a0a4,f45a47a..93c6c50
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -23,17 -23,8 +23,16 @@@ import java.io.EOFException
  import java.io.File;
  import java.io.FileOutputStream;
  import java.io.IOException;
 -import java.nio.ByteBuffer;
 -import java.util.*;
 +import java.util.ArrayDeque;
 +import java.util.ArrayList;
- import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.UUID;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicInteger;
@@@ -45,27 -35,23 +44,25 @@@ import com.google.common.base.Throwable
  import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
- import com.google.common.collect.Ordering;
  import com.google.common.util.concurrent.Uninterruptibles;
 -
  import org.apache.commons.lang3.StringUtils;
 +import org.cliffc.high_scale_lib.NonBlockingHashSet;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.util.FileSegmentInputStream;
 -import org.apache.cassandra.io.util.RebufferingInputStream;
 -import org.apache.cassandra.schema.CompressionParams;
 -import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.db.rows.SerializationHelper;
  import org.apache.cassandra.io.util.ChannelProxy;
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.FileDataInput;
@@@ -89,10 -71,10 +86,10 @@@ public class CommitLogReplaye
      private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
  
      private final Set<Keyspace> keyspacesRecovered;
 -    private final List<Future<?>> futures;
 +    private final Queue<Future<Integer>> futures;
      private final Map<UUID, AtomicInteger> invalidMutations;
      private final AtomicInteger replayedCount;
-     private final Map<UUID, ReplayPosition> cfPositions;
+     private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
      private final ReplayPosition globalPosition;
      private final CRC32 checksum;
      private byte[] buffer;
@@@ -101,79 -83,12 +98,77 @@@
      private final ReplayFilter replayFilter;
      private final CommitLogArchiver archiver;
  
 +    /*
 +     * Wrapper around initiating mutations read from the log to make it possible
 +     * to spy on initiated mutations for test
 +     */
 +    @VisibleForTesting
 +    public static class MutationInitiator
 +    {
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                                                   final long segmentId,
 +                                                   final int serializedSize,
-                                                    final long entryLocation,
++                                                   final int entryLocation,
 +                                                   final CommitLogReplayer clr)
 +        {
 +            Runnable runnable = new WrappedRunnable()
 +            {
 +                public void runMayThrow()
 +                {
 +                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                        return;
 +                    if (clr.pointInTimeExceeded(mutation))
 +                        return;
 +
 +                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +
 +                    // Rebuild the mutation, omitting column families that
 +                    //    a) the user has requested that we ignore,
 +                    //    b) have already been flushed,
 +                    // or c) are part of a cf that was dropped.
 +                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 +                    Mutation newMutation = null;
 +                    for (PartitionUpdate update : clr.replayFilter.filter(mutation))
 +                    {
 +                        if (Schema.instance.getCF(update.metadata().cfId) == null)
 +                            continue; // dropped
 +
-                         ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId);
- 
 +                        // replay if current segment is newer than last flushed one or,
 +                        // if it is the last known segment, if we are after the replay position
-                         if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
++                        if (clr.shouldReplay(update.metadata().cfId, new ReplayPosition(segmentId, entryLocation)))
 +                        {
 +                            if (newMutation == null)
 +                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 +                            newMutation.add(update);
 +                            clr.replayedCount.incrementAndGet();
 +                        }
 +                    }
 +                    if (newMutation != null)
 +                    {
 +                        assert !newMutation.isEmpty();
 +
 +                        try
 +                        {
 +                            Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
 +                        }
 +                        catch (ExecutionException e)
 +                        {
 +                            throw Throwables.propagate(e.getCause());
 +                        }
 +
 +                        clr.keyspacesRecovered.add(keyspace);
 +                    }
 +                }
 +            };
 +            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
 +        }
 +    }
 +
-     CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
+     CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
      {
          this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
 -        this.futures = new ArrayList<Future<?>>();
 +        this.futures = new ArrayDeque<Future<Integer>>();
          this.buffer = new byte[4096];
 -        this.uncompressedBuffer = new byte[4096];
          this.invalidMutations = new HashMap<UUID, AtomicInteger>();
          // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
          this.replayedCount = new AtomicInteger();
@@@ -342,11 -285,22 +334,24 @@@
          }
      }
  
+     /**
+      * consult the known-persisted ranges for our sstables;
+      * if the position is covered by one of them it does not need to be replayed
+      *
+      * @return true iff replay is necessary
+      */
+     private boolean shouldReplay(UUID cfId, ReplayPosition position)
+     {
+         ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+         return filter == null || filter.shouldReplay(position);
+     }
+ 
+     @SuppressWarnings("resource")
      public void recover(File file, boolean tolerateTruncation) throws IOException
      {
 +        // just transform from the file name (no reading of headers) to determine version
          CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 +
          try(ChannelProxy channel = new ChannelProxy(file);
              RandomAccessReader reader = RandomAccessReader.open(channel))
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index dd07b19,16090a1..be1436c
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -329,10 -332,10 +334,10 @@@ public class Tracke
          apply(View.markFlushing(memtable));
      }
  
 -    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
 +    public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
      {
          assert !isDummy();
-         if (sstables == null || Iterables.isEmpty(sstables))
 -        if (sstables.isEmpty())
++        if (Iterables.isEmpty(sstables))
          {
              // sstable may be null if we flushed batchlog and nothing needed to be retained
              // if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@@ -348,14 -351,7 +353,9 @@@
  
          Throwable fail;
          fail = updateSizeTracking(emptySet(), sstables, null);
-         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-         fail = notifyAdded(sstables, fail);
  
 +        notifyDiscarded(memtable);
 +
-         if (!isDummy() && !cfstore.isValid())
-             dropSSTables();
- 
          maybeFail(fail);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 63926ed,17062b4..cde6363
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -319,12 -321,14 +321,14 @@@ public class Vie
                  List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
                  assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
  
-                 if (flushed == null)
 -                if (flushed == null || flushed.isEmpty())
++                if (flushed == null || Iterables.isEmpty(flushed))
                      return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
-                                     view.compactingMap, view.intervalTree);
+                                     view.compactingMap, view.premature, view.intervalTree);
  
                  Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
-                 return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
+                 Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
+                 Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
+                 return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
                                  SSTableIntervalTree.build(sstableMap.keySet()));
              }
          };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index e030b5b,e0fb3b1..9b6f491
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -120,8 -121,10 +120,10 @@@ public class BigFormat implements SSTab
          //             switch uncompressed checksums to adler32
          //             tracks presense of legacy (local and remote) counter shards
          // la (2.2.0): new file name format
+         // lb (2.2.7): commit log lower bound included
          // ma (3.0.0): swap bf hash order
          //             store rows natively
 -        // mb (3.0.6): commit log lower bound included
++        // mb (3.0.7, 3.7): commit log lower bound included
          //
          // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index c574a48,53cf0b0..ca50a44
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -20,10 -20,14 +20,11 @@@ package org.apache.cassandra.io.sstable
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.Collections;
 -import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
 -import java.util.Set;
  
  import com.google.common.collect.Maps;
+ import com.google.common.collect.Ordering;
  
  import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
  import com.clearspring.analytics.stream.cardinality.ICardinality;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index d2e0513,420b802..3b3d7e1
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -116,11 -70,9 +116,12 @@@ public class SSTableMetadataViewe
                      out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
                      out.printf("SSTable Level: %d%n", stats.sstableLevel);
                      out.printf("Repaired at: %d%n", stats.repairedAt);
-                     out.println(stats.replayPosition);
+                     out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
+                     out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
 +                    out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
 +                    out.printf("totalRows: %s%n", stats.totalRows);
                      out.println("Estimated tombstone drop times:");
 +
                      for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                      {
                          out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index e6f9499,31dea3e..8e45eea
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -461,13 -448,11 +461,13 @@@ public class CommitLogStressTes
              super(log, discardedPos, null, ReplayFilter.create());
          }
  
 -        int hash = 0;
 -        int cells = 0;
 +        int hash;
 +        int cells;
 +        int discarded;
 +        int skipped;
  
          @Override
-         void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
+         void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
          {
              if (desc.id < discardedPos.segment)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 788757c,baf9466..5ac53f6
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -56,65 -50,6 +56,65 @@@ import org.apache.cassandra.db.commitlo
  public class RecoveryManagerTest
  {
      private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
 +    static final Semaphore blocker = new Semaphore(0);
 +    static final Semaphore blocked = new Semaphore(0);
 +    static CommitLogReplayer.MutationInitiator originalInitiator = null;
 +    static final CommitLogReplayer.MutationInitiator mockInitiator = new CommitLogReplayer.MutationInitiator()
 +    {
 +        @Override
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                final long segmentId,
 +                final int serializedSize,
-                 final long entryLocation,
++                final int entryLocation,
 +                final CommitLogReplayer clr)
 +        {
 +            final Future<Integer> toWrap = super.initiateMutation(mutation,
 +                                                                  segmentId,
 +                                                                  serializedSize,
 +                                                                  entryLocation,
 +                                                                  clr);
 +            return new Future<Integer>()
 +            {
 +
 +                @Override
 +                public boolean cancel(boolean mayInterruptIfRunning)
 +                {
 +                    throw new UnsupportedOperationException();
 +                }
 +
 +                @Override
 +                public boolean isCancelled()
 +                {
 +                    throw new UnsupportedOperationException();
 +                }
 +
 +                @Override
 +                public boolean isDone()
 +                {
 +                    return blocker.availablePermits() > 0 && toWrap.isDone();
 +                }
 +
 +                @Override
 +                public Integer get() throws InterruptedException, ExecutionException
 +                {
 +                    System.out.println("Got blocker once");
 +                    blocked.release();
 +                    blocker.acquire();
 +                    return toWrap.get();
 +                }
 +
 +                @Override
 +                public Integer get(long timeout, TimeUnit unit)
 +                        throws InterruptedException, ExecutionException, TimeoutException
 +                {
 +                    blocked.release();
 +                    blocker.tryAcquire(1, timeout, unit);
 +                    return toWrap.get(timeout, unit);
 +                }
 +
 +            };
 +        }
 +    };
  
      private static final String KEYSPACE1 = "RecoveryManagerTest1";
      private static final String CF_STANDARD1 = "Standard1";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index b5cbf8b,555cdda..1ea0eb1
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -584,147 -555,21 +584,147 @@@ public class CommitLogTes
      }
  
      @Test
 -    public void testDescriptorInvalidParametersSize() throws IOException
 +    public void replay_Compressed_Deflate() throws IOException
      {
 -        Map<String, String> params = new HashMap<>();
 -        for (int i=0; i<65535; ++i)
 -            params.put("key"+i, Integer.toString(i, 16));
 -        try {
 -            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
 -                                                               21,
 -                                                               new ParameterizedClass("LZ4Compressor", params));
 -            ByteBuffer buf = ByteBuffer.allocate(1024000);
 -            CommitLogDescriptor.writeHeader(buf, desc);
 -            Assert.fail("Parameter object too long should fail on writing descriptor.");
 -        } catch (ConfigurationException e)
 +        replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
 +    }
 +
 +    private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
 +    {
 +        ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
 +        EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
 +        try
 +        {
 +            DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
 +            DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
 +            CommitLog.instance.resetUnsafe(true);
 +
 +            replaySimple(CommitLog.instance);
 +            replayWithDiscard(CommitLog.instance);
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setCommitLogCompression(originalCompression);
 +            DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
 +            CommitLog.instance.resetUnsafe(true);
 +        }
 +    }
 +
 +    @Test
 +    public void replay_Encrypted() throws IOException
 +    {
 +        ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
 +        EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
 +        try
 +        {
 +            DatabaseDescriptor.setCommitLogCompression(null);
 +            DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
 +            CommitLog.instance.resetUnsafe(true);
 +
 +            replaySimple(CommitLog.instance);
 +            replayWithDiscard(CommitLog.instance);
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setCommitLogCompression(originalCompression);
 +            DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
 +            CommitLog.instance.resetUnsafe(true);
 +        }
 +    }
 +
 +    private void replaySimple(CommitLog commitLog) throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        commitLog.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        commitLog.add(rm2);
 +
 +        commitLog.sync(true);
 +
 +        Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
 +        List<String> activeSegments = commitLog.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    private void replayWithDiscard(CommitLog commitLog) throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        ReplayPosition replayPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
 +            ReplayPosition position = commitLog.add(rm1);
 +
 +            if (i == discardPosition)
 +                replayPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
 +        commitLog.sync(true);
 +
 +        Replayer replayer = new Replayer(commitLog, replayPosition);
 +        List<String> activeSegments = commitLog.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class Replayer extends CommitLogReplayer
 +    {
 +        private final ReplayPosition filterPosition;
 +        int cells;
 +        int skipped;
 +
 +        Replayer(CommitLog commitLog, ReplayPosition filterPosition)
 +        {
-             super(commitLog, filterPosition, null, ReplayFilter.create());
++            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +        }
 +
 +        @SuppressWarnings("resource")
-         void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException
++        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
          {
 -            // correct path
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +
 +            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
 +            Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
 +                for (Row row : partitionUpdate)
 +                    cells += Iterables.size(row.cells());
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 0294115,b8de711..902e17a
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -298,11 -298,13 +299,14 @@@ public class TrackerTes
          Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
  
          SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
 -        tracker.replaceFlushed(prev2, Collections.singleton(reader));
 +        tracker.replaceFlushed(prev2, singleton(reader));
          Assert.assertEquals(1, tracker.getView().sstables.size());
+         Assert.assertEquals(1, tracker.getView().premature.size());
+         tracker.permitCompactionOfFlushed(singleton(reader));
+         Assert.assertEquals(0, tracker.getView().premature.size());
 -        Assert.assertEquals(1, listener.received.size());
 -        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
 +        Assert.assertEquals(2, listener.received.size());
-         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
-         Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
++        Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
++        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
          listener.received.clear();
          Assert.assertTrue(reader.isKeyCacheSetup());
          Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@@ -316,17 -318,13 +320,17 @@@
          tracker.markFlushing(prev1);
          reader = MockSchema.sstable(0, 10, true, cfs);
          cfs.invalidate(false);
 -        tracker.replaceFlushed(prev1, Collections.singleton(reader));
 +        tracker.replaceFlushed(prev1, singleton(reader));
+         tracker.permitCompactionOfFlushed(Collections.singleton(reader));
          Assert.assertEquals(0, tracker.getView().sstables.size());
          Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
          Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
 -        Assert.assertEquals(reader, (((SSTableDeletingNotification) listener.received.get(0)).deleting));
 -        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
 +        System.out.println(listener.received);
-         Assert.assertEquals(5, listener.received.size());
++        Assert.assertEquals(4, listener.received.size());
 +        Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
-         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
-         Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(2)).memtable);
-         Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
-         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
++        Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
++        Assert.assertTrue(listener.received.get(2) instanceof SSTableDeletingNotification);
++        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(3)).removed.size());
          DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ac0506/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
----------------------------------------------------------------------