You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:10 UTC

[19/23] cassandra git commit: Merge commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index b1c706e,5a3d524..f464e08
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -353,35 -347,13 +349,16 @@@ public class Tracke
  
          Throwable fail;
          fail = updateSizeTracking(emptySet(), sstables, null);
 +
 +        notifyDiscarded(memtable);
 +
-         maybeFail(fail);
-     }
- 
-     /**
-      * permit compaction of the provided sstable; this translates to notifying compaction
-      * strategies of its existence, and potentially submitting a background task
-      */
-     public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
-     {
-         if (sstables.isEmpty())
-             return;
+         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+         fail = notifyAdded(sstables, fail);
  
-         apply(View.permitCompactionOfFlushed(sstables));
- 
-         if (isDummy())
-             return;
- 
-         if (cfstore.isValid())
-         {
-             notifyAdded(sstables);
-             CompactionManager.instance.submitBackground(cfstore);
-         }
-         else
-         {
+         if (!isDummy() && !cfstore.isValid())
              dropSSTables();
-         }
+ 
+         maybeFail(fail);
      }
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index a5c781d,4b3aae0..b26426d
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -40,7 -39,7 +39,6 @@@ import static com.google.common.collect
  import static com.google.common.collect.Iterables.all;
  import static com.google.common.collect.Iterables.concat;
  import static com.google.common.collect.Iterables.filter;
--import static com.google.common.collect.Iterables.transform;
  import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
  import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
  import static org.apache.cassandra.db.lifecycle.Helpers.replace;
@@@ -336,14 -333,12 +332,12 @@@ public class Vie
                  List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
                  assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
  
 -                if (flushed == null || flushed.isEmpty())
 +                if (flushed == null || Iterables.isEmpty(flushed))
                      return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
-                                     view.compactingMap, view.premature, view.intervalTree);
+                                     view.compactingMap, view.intervalTree);
  
                  Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
-                 Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
-                 Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
-                 return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
+                 return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
                                  SSTableIntervalTree.build(sstableMap.keySet()));
              }
          };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 505de49,a683513..14e391b
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@@ -24,7 -24,8 +24,8 @@@ import java.util.*
  import com.google.common.collect.Maps;
  
  import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
 -import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.format.Version;
@@@ -35,6 -36,8 +36,8 @@@ import org.apache.cassandra.utils.ByteB
  import org.apache.cassandra.utils.EstimatedHistogram;
  import org.apache.cassandra.utils.StreamingHistogram;
  
 -import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer;
++import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.commitLogPositionSetSerializer;
+ 
  /**
   * Serializer for SSTable from legacy versions
   */
@@@ -55,7 -58,7 +58,7 @@@ public class LegacyMetadataSerializer e
  
          EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
          EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
-         CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out);
 -        ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
++        CommitLogPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
          out.writeLong(stats.minTimestamp);
          out.writeLong(stats.maxTimestamp);
          out.writeInt(stats.maxLocalDeletionTime);
@@@ -72,7 -75,9 +75,9 @@@
          for (ByteBuffer value : stats.maxClusteringValues)
              ByteBufferUtil.writeWithShortLength(value, out);
          if (version.hasCommitLogLowerBound())
-             CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out);
 -            ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
++            CommitLogPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
+         if (version.hasCommitLogIntervals())
 -            replayPositionSetSerializer.serialize(stats.commitLogIntervals, out);
++            commitLogPositionSetSerializer.serialize(stats.commitLogIntervals, out);
      }
  
      /**
@@@ -120,7 -125,12 +125,12 @@@
                      maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
  
                  if (descriptor.version.hasCommitLogLowerBound())
 -                    commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
 -                IntervalSet<ReplayPosition> commitLogIntervals;
 +                    commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
++                IntervalSet<CommitLogPosition> commitLogIntervals;
+                 if (descriptor.version.hasCommitLogIntervals())
 -                    commitLogIntervals = replayPositionSetSerializer.deserialize(in);
++                    commitLogIntervals = commitLogPositionSetSerializer.deserialize(in);
+                 else
+                     commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound);
  
                  if (types.contains(MetadataType.VALIDATION))
                      components.put(MetadataType.VALIDATION,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 299bc87,1ff2ca8..196cfbf
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -20,16 -20,18 +20,16 @@@ package org.apache.cassandra.io.sstable
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.Collections;
 -import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
 -import java.util.Set;
  
  import com.google.common.collect.Maps;
- import com.google.common.collect.Ordering;
  
  import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
  import com.clearspring.analytics.stream.cardinality.ICardinality;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
  import org.apache.cassandra.db.rows.Cell;
@@@ -88,8 -89,7 +87,7 @@@ public class MetadataCollector implemen
      protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
      // TODO: cound the number of row per partition (either with the number of cells, or instead)
      protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
-     protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
-     protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
 -    protected IntervalSet commitLogIntervals = IntervalSet.empty();
++    protected IntervalSet<CommitLogPosition> commitLogIntervals = IntervalSet.empty();
      protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
      protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
      protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@@ -123,23 -123,13 +121,13 @@@
      {
          this(comparator);
  
-         CommitLogPosition min = null, max = null;
 -        IntervalSet.Builder intervals = new IntervalSet.Builder();
++        IntervalSet.Builder<CommitLogPosition> intervals = new IntervalSet.Builder<>();
          for (SSTableReader sstable : sstables)
          {
-             if (min == null)
-             {
-                 min = sstable.getSSTableMetadata().commitLogLowerBound;
-                 max = sstable.getSSTableMetadata().commitLogUpperBound;
-             }
-             else
-             {
-                 min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
-                 max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
-             }
+             intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals);
          }
  
-         commitLogLowerBound(min);
-         commitLogUpperBound(max);
+         commitLogIntervals(intervals.build());
          sstableLevel(level);
      }
  
@@@ -226,15 -216,9 +214,9 @@@
          ttlTracker.update(newTTL);
      }
  
-     public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound)
 -    public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals)
++    public MetadataCollector commitLogIntervals(IntervalSet<CommitLogPosition> commitLogIntervals)
      {
-         this.commitLogLowerBound = commitLogLowerBound;
-         return this;
-     }
- 
-     public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound)
-     {
-         this.commitLogUpperBound = commitLogUpperBound;
+         this.commitLogIntervals = commitLogIntervals;
          return this;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index e765235,9971eaa..c83c2cf
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@@ -26,7 -27,8 +27,8 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.commons.lang3.builder.EqualsBuilder;
  import org.apache.commons.lang3.builder.HashCodeBuilder;
  import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
 -import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -39,11 -41,11 +41,11 @@@ import org.apache.cassandra.utils.Strea
  public class StatsMetadata extends MetadataComponent
  {
      public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
 -    public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer);
++    public static final ISerializer<IntervalSet<CommitLogPosition>> commitLogPositionSetSerializer = IntervalSet.serializer(CommitLogPosition.serializer);
  
      public final EstimatedHistogram estimatedPartitionSize;
      public final EstimatedHistogram estimatedColumnCount;
-     public final CommitLogPosition commitLogLowerBound;
-     public final CommitLogPosition commitLogUpperBound;
 -    public final IntervalSet<ReplayPosition> commitLogIntervals;
++    public final IntervalSet<CommitLogPosition> commitLogIntervals;
      public final long minTimestamp;
      public final long maxTimestamp;
      public final int minLocalDeletionTime;
@@@ -62,8 -64,7 +64,7 @@@
  
      public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                           EstimatedHistogram estimatedColumnCount,
-                          CommitLogPosition commitLogLowerBound,
-                          CommitLogPosition commitLogUpperBound,
 -                         IntervalSet<ReplayPosition> commitLogIntervals,
++                         IntervalSet<CommitLogPosition> commitLogIntervals,
                           long minTimestamp,
                           long maxTimestamp,
                           int minLocalDeletionTime,
@@@ -239,7 -235,7 +235,7 @@@
              int size = 0;
              size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
              size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
-             size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound);
 -            size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE));
++            size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE));
              if (version.storeRows())
                  size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
              else
@@@ -258,7 -254,9 +254,9 @@@
              if (version.storeRows())
                  size += 8 + 8; // totalColumnsSet, totalRows
              if (version.hasCommitLogLowerBound())
-                 size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound);
 -                size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE));
++                size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE));
+             if (version.hasCommitLogIntervals())
 -                size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals);
++                size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals);
              return size;
          }
  
@@@ -266,7 -264,7 +264,7 @@@
          {
              EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
              EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
-             CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out);
 -            ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
++            CommitLogPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE), out);
              out.writeLong(component.minTimestamp);
              out.writeLong(component.maxTimestamp);
              if (version.storeRows())
@@@ -296,7 -294,9 +294,9 @@@
              }
  
              if (version.hasCommitLogLowerBound())
-                 CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out);
 -                ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
++                CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
+             if (version.hasCommitLogIntervals())
 -                replayPositionSetSerializer.serialize(component.commitLogIntervals, out);
++                commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out);
          }
  
          public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@@ -337,7 -337,12 +337,12 @@@
              long totalRows = version.storeRows() ? in.readLong() : -1L;
  
              if (version.hasCommitLogLowerBound())
 -                commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
 -            IntervalSet<ReplayPosition> commitLogIntervals;
 +                commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
++            IntervalSet<CommitLogPosition> commitLogIntervals;
+             if (version.hasCommitLogIntervals())
 -                commitLogIntervals = replayPositionSetSerializer.deserialize(in);
++                commitLogIntervals = commitLogPositionSetSerializer.deserialize(in);
+             else
 -                commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound);
++                commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound);
  
              return new StatsMetadata(partitionSizes,
                                       columnCounts,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 3c8ba64,5f7513f..6686684
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@@ -112,15 -70,11 +112,14 @@@ public class SSTableMetadataViewe
                      out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
                      out.printf("SSTable Level: %d%n", stats.sstableLevel);
                      out.printf("Repaired at: %d%n", stats.repairedAt);
-                     out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
-                     out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+                     out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
 +                    out.printf("totalColumnsSet: %s%n", stats.totalColumnsSet);
 +                    out.printf("totalRows: %s%n", stats.totalRows);
                      out.println("Estimated tombstone drop times:");
 -                    for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
 +
 +                    for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                      {
 -                        out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());
 +                        out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]);
                      }
                      printHistograms(stats, out);
                  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 239077e,02b26c7..2858597
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -250,9 -245,9 +250,10 @@@ public class CommitLogStressTes
              }
              verifySizes(commitLog);
  
-             commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
+             commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
 -                    ReplayPosition.NONE, discardedPos);
++                    CommitLogPosition.NONE, discardedPos);
              threads.clear();
 +
              System.out.format("Discarded at %s\n", discardedPos);
              verifySizes(commitLog);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index cd709d5,d04ca9b..7bcee7a
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -677,9 -648,26 +679,26 @@@ public class Uti
          }
  
          private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs,
 -                                                                 ReadOrderGroup orderGroup)
 +                                                                 ReadExecutionController controller)
          {
 -            return queryStorage(cfs, orderGroup);
 +            return queryStorage(cfs, controller);
          }
      }
+ 
+     public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
+     {
+         try
+         {
+             for ( ; ; )
+             {
+                 DataDirectory dir = cfs.getDirectories().getWriteableLocation(1);
+                 BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
+             }
+         }
+         catch (IOError e)
+         {
+             // Expected -- marked all directories as unwritable
+         }
+         return () -> BlacklistedDirectories.clearUnwritableUnsafe();
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 23ec58b,9a0ddb8..6ab7d46
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -20,9 -20,13 +20,10 @@@ package org.apache.cassandra.db.commitl
  
  import java.io.*;
  import java.nio.ByteBuffer;
 -import java.util.Arrays;
 -import java.util.Collection;
 -import java.util.Collections;
 -import java.util.UUID;
 +import java.util.*;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
+ import java.util.function.BiConsumer;
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
  
@@@ -35,34 -40,27 +36,34 @@@ import org.junit.runners.Parameterized.
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.Mutation;
- import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.config.Config.DiskFailurePolicy;
+ import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.marshal.AsciiType;
  import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.compress.DeflateCompressor;
  import org.apache.cassandra.io.compress.LZ4Compressor;
  import org.apache.cassandra.io.compress.SnappyCompressor;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.security.EncryptionContextGenerator;
 +import org.apache.cassandra.utils.Hex;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.KillerForTests;
 +import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.vint.VIntCoding;
  
 +import org.junit.After;
  import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
@@@ -247,13 -228,13 +248,13 @@@ public class CommitLogTes
                        .build();
          CommitLog.instance.add(m2);
  
 -        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
 +        assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
-         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
 -        CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext());
++        CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
 -        // Assert we still have both our segment
 -        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
 +        // Assert we still have both our segments
 +        assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
      }
  
      @Test
@@@ -278,9 -258,9 +279,9 @@@
          // "Flush": this won't delete anything
          UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
          CommitLog.instance.sync(true);
-         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
 -        CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++        CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
 -        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 +        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
          Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -298,10 -279,10 +299,10 @@@
          // didn't write anything on cf1 since last flush (and we flush cf2)
  
          UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
-         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
 -        CommitLog.instance.discardCompletedSegments(cfid2, ReplayPosition.NONE, CommitLog.instance.getContext());
++        CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
          // Assert we still have both our segment
 -        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 +        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
      }
  
      private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
@@@ -545,13 -486,13 +546,13 @@@
              for (int i = 0 ; i < 5 ; i++)
                  CommitLog.instance.add(m2);
  
 -            assertEquals(2, CommitLog.instance.activeSegments());
 -            ReplayPosition position = CommitLog.instance.getContext();
 -            for (Keyspace ks : Keyspace.system())
 -                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 -                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, ReplayPosition.NONE, position);
 -            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, ReplayPosition.NONE, position);
 -            assertEquals(1, CommitLog.instance.activeSegments());
 +            assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
 +            CommitLogPosition position = CommitLog.instance.getCurrentPosition();
 +            for (Keyspace keyspace : Keyspace.system())
 +                for (ColumnFamilyStore syscfs : keyspace.getColumnFamilyStores())
-                     CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
-             CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
++                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, CommitLogPosition.NONE, position);
++            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, CommitLogPosition.NONE, position);
 +            assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
          }
          finally
          {
@@@ -589,108 -530,136 +590,240 @@@
      }
  
      @Test
 +    public void replaySimple() throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm2);
 +
 +        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    @Test
 +    public void replayWithDiscard() throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        CommitLogPosition commitLogPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
 +            CommitLogPosition position = CommitLog.instance.add(rm1);
 +
 +            if (i == discardPosition)
 +                commitLogPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
 +        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class SimpleCountingReplayer extends CommitLogReplayer
 +    {
 +        private final CommitLogPosition filterPosition;
 +        private final CFMetaData metadata;
 +        int cells;
 +        int skipped;
 +
 +        SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
 +        {
 +            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +            this.metadata = cfm;
 +        }
 +
 +        @SuppressWarnings("resource")
 +        @Override
 +        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
 +        {
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +            for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
 +            {
 +                // Only process mutations for the CF's we're testing against, since we can't deterministically predict
 +                // whether or not system keyspaces will be mutated during a test.
 +                if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
 +                {
 +                    for (Row row : partitionUpdate)
 +                        cells += Iterables.size(row.cells());
 +                }
 +            }
 +        }
 +    }
++
+     public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+ 
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+         try
+         {
+             DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+ 
+             for (int i = 0 ; i < 5 ; i++)
+             {
+                 new RowUpdateBuilder(cfs.metadata, 0, "k")
+                     .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+                     .build()
+                     .apply();
+ 
+                 if (i == 2)
+                 {
+                     try (Closeable c = Util.markDirectoriesUnwriteable(cfs))
+                     {
+                         cfs.forceBlockingFlush();
+                     }
+                     catch (Throwable t)
+                     {
+                         // expected. Cause (after some wrappings) should be a write error
+                         while (!(t instanceof FSWriteError))
+                             t = t.getCause();
+                     }
+                 }
+                 else
+                     cfs.forceBlockingFlush();
+             }
+         }
+         finally
+         {
+             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+         }
+ 
+         CommitLog.instance.sync(true);
+         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
+         // If retries work subsequent flushes should clear up error and this should change to expect 0.
+         Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+     }
+ 
+     public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction)
+             throws ExecutionException, InterruptedException, IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+ 
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         for (int i = 0 ; i < 5 ; i++)
+         {
+             new RowUpdateBuilder(cfs.metadata, 0, "k")
+                 .clustering("c" + i).add("val", ByteBuffer.allocate(100))
+                 .build()
+                 .apply();
+ 
+             Memtable current = cfs.getTracker().getView().getCurrentMemtable();
+             if (i == 2)
+                 current.makeUnflushable();
+ 
+             flushAction.accept(cfs, current);
+         }
+         if (performCompaction)
+             cfs.forceMajorCompaction();
+         // Make sure metadata saves and reads fine
+         for (SSTableReader reader : cfs.getLiveSSTables())
+             reader.reloadSSTableMetadata();
+ 
+         CommitLog.instance.sync(true);
+         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
+         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
+         // persisted all data in the commit log. Because we know there was an error, there must be something left to
+         // replay.
+         Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
+     }
+ 
+     BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) ->
+     {
+         try
+         {
+             cfs.forceBlockingFlush();
+         }
+         catch (Throwable t)
+         {
+             // expected after makeUnflushable. Cause (after some wrappings) should be a write error
+             while (!(t instanceof FSWriteError))
+                 t = t.getCause();
+             // Wait for started flushes to complete.
+             cfs.switchMemtableIfCurrent(current);
+         }
+     };
+ 
+     BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) ->
+     {
+         // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain
+         // flushed data.
+         // This does not stop on errors and should retain segments for which flushing failed.
+         CommitLog.instance.forceRecycleAllSegments();
+ 
+         // Wait for started flushes to complete.
+         cfs.switchMemtableIfCurrent(current);
+     };
+ 
+     @Test
+     public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(flush, false);
+     }
+ 
+     @Test
+     public void testOutOfOrderLogDiscard() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(recycleSegments, false);
+     }
+ 
+     @Test
+     public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(flush, true);
+     }
+ 
+     @Test
+     public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException
+     {
+         testOutOfOrderFlushRecovery(recycleSegments, true);
+     }
  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 1668ddc,479e4e2..84e3e05
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -299,14 -298,10 +299,11 @@@ public class TrackerTes
          Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
  
          SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
 -        tracker.replaceFlushed(prev2, Collections.singleton(reader));
 +        tracker.replaceFlushed(prev2, singleton(reader));
          Assert.assertEquals(1, tracker.getView().sstables.size());
-         Assert.assertEquals(1, tracker.getView().premature.size());
-         tracker.permitCompactionOfFlushed(singleton(reader));
-         Assert.assertEquals(0, tracker.getView().premature.size());
 -        Assert.assertEquals(1, listener.received.size());
 -        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
 +        Assert.assertEquals(2, listener.received.size());
 +        Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
 +        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
          listener.received.clear();
          Assert.assertTrue(reader.isKeyCacheSetup());
          Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@@ -324,13 -319,10 +321,12 @@@
          Assert.assertEquals(0, tracker.getView().sstables.size());
          Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
          Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
-         System.out.println(listener.received);
-         Assert.assertEquals(4, listener.received.size());
 -        Assert.assertEquals(3, listener.received.size());
 -        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
 -        Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
 -        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
++        Assert.assertEquals(5, listener.received.size());
 +        Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
 +        Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
-         Assert.assertTrue(listener.received.get(2) instanceof SSTableDeletingNotification);
-         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(3)).removed.size());
++        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added);
++        Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
++        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
          DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index a3382c4,de12d57..4bd4489
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@@ -32,7 -32,8 +32,8 @@@ import org.apache.cassandra.SchemaLoade
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.SerializationHeader;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
 -import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.dht.RandomPartitioner;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
@@@ -85,8 -86,7 +86,7 @@@ public class MetadataSerializerTes
  
          CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1");
          MetadataCollector collector = new MetadataCollector(cfm.comparator)
-                                           .commitLogLowerBound(cllb)
-                                           .commitLogUpperBound(club);
 -                                          .commitLogIntervals(new IntervalSet(cllb, club));
++                                          .commitLogIntervals(new IntervalSet<>(cllb, club));
  
          String partitioner = RandomPartitioner.class.getCanonicalName();
          double bfFpChance = 0.1;