You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/11/15 22:26:16 UTC

git commit: SSTable/SSTableReader cleanup

Updated Branches:
  refs/heads/trunk fab27bd59 -> f388c9d69


SSTable/SSTableReader cleanup

patch by yukim; reviewed by jbellis for CASSANDRA-6355


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f388c9d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f388c9d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f388c9d6

Branch: refs/heads/trunk
Commit: f388c9d69b855f0c3b146864717a971034fd3dc5
Parents: fab27bd
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Nov 15 15:23:55 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Nov 15 15:23:55 2013 -0600

----------------------------------------------------------------------
 .../cassandra/db/CollationController.java       |  5 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  4 +-
 .../db/compaction/CompactionController.java     |  3 +-
 .../cassandra/db/compaction/CompactionTask.java |  6 +-
 .../compaction/LeveledCompactionStrategy.java   |  3 +-
 .../db/compaction/LeveledManifest.java          | 14 ++--
 .../cassandra/db/compaction/Upgrader.java       |  9 +--
 .../apache/cassandra/io/sstable/Component.java  |  6 --
 .../cassandra/io/sstable/KeyIterator.java       |  2 +-
 .../apache/cassandra/io/sstable/SSTable.java    | 55 ++-------------
 .../cassandra/io/sstable/SSTableMetadata.java   |  2 +-
 .../cassandra/io/sstable/SSTableReader.java     | 73 ++++++++++++++------
 .../cassandra/io/sstable/SSTableWriter.java     | 12 ++--
 .../io/util/DataIntegrityMetadata.java          |  4 +-
 .../LongLeveledCompactionStrategyTest.java      |  3 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  2 +-
 .../LeveledCompactionStrategyTest.java          |  5 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  2 +-
 18 files changed, 88 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 758d523..9896fde 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.tracing.Tracing;
@@ -99,7 +98,7 @@ public class CollationController
             QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
 
             /* add the SSTables on disk */
-            Collections.sort(view.sstables, SSTable.maxTimestampComparator);
+            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
 
             // read sorted sstables
             long mostRecentRowTombstone = Long.MIN_VALUE;
@@ -219,7 +218,7 @@ public class CollationController
              * In othere words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination
              * in one pass, and minimize the number of sstables for which we read a rowTombstone.
              */
-            Collections.sort(view.sstables, SSTable.maxTimestampComparator);
+            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
             List<SSTableReader> skippedSSTables = null;
             long mostRecentRowTombstone = Long.MIN_VALUE;
             long minTimestamp = Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 73704d4..1b8a1bf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -422,7 +422,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
 
-            if (components.contains(Component.COMPACTED_MARKER) || desc.temporary)
+            if (desc.temporary)
             {
                 SSTable.delete(desc, components);
                 continue;
@@ -1010,7 +1010,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         if (operation != OperationType.CLEANUP || isIndex())
         {
-            return SSTable.getTotalBytes(sstables);
+            return SSTableReader.getTotalBytes(sstables);
         }
 
         // cleanup size estimation only counts bytes for keys local to this node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 201cd0a..dc7730c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -118,7 +117,7 @@ public class CompactionController
         // we still need to keep candidates that might shadow something in a
         // non-candidate sstable. And if we remove a sstable from the candidates, we
         // must take it's timestamp into account (hence the sorting below).
-        Collections.sort(candidates, SSTable.maxTimestampComparator);
+        Collections.sort(candidates, SSTableReader.maxTimestampComparator);
 
         Iterator<SSTableReader> iterator = candidates.iterator();
         while (iterator.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index bc419ad..f4cc500 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -118,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
         long totalkeysWritten = 0;
 
         long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
-        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize());
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize());
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
@@ -244,8 +244,8 @@ public class CompactionTask extends AbstractCompactionTask
 
         // log a bunch of statistics about the result and save to system table compaction_history
         long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTable.getTotalBytes(toCompact);
-        long endsize = SSTable.getTotalBytes(sstables);
+        long startsize = SSTableReader.getTotalBytes(toCompact);
+        long endsize = SSTableReader.getTotalBytes(sstables);
         double ratio = (double) endsize / (double) startsize;
 
         StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index e992003..3eb5e91 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -234,7 +233,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
             }
 
             totalLength = length;
-            Collections.sort(this.sstables, SSTable.sstableComparator);
+            Collections.sort(this.sstables, SSTableReader.sstableComparator);
             sstableIterator = this.sstables.iterator();
             assert sstableIterator.hasNext(); // caller should check intersecting first
             currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 63ad2e4..2b79493 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -151,13 +151,13 @@ public class LeveledManifest
             minLevel = Math.min(minLevel, ssTableReader.getSSTableLevel());
             add(ssTableReader);
         }
-        lastCompactedKeys[minLevel] = SSTable.sstableOrdering.max(added).last;
+        lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
     }
 
     public synchronized void repairOverlappingSSTables(int level)
     {
         SSTableReader previous = null;
-        Collections.sort(generations[level], SSTable.sstableComparator);
+        Collections.sort(generations[level], SSTableReader.sstableComparator);
         List<SSTableReader> outOfOrderSSTables = new ArrayList<SSTableReader>();
         for (SSTableReader current : generations[level])
         {
@@ -264,7 +264,7 @@ public class LeveledManifest
             // we want to calculate score excluding compacting ones
             Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
             Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting());
-            double score = (double)SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
+            double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
             logger.debug("Compaction score for level {} is {}", i, score);
 
             if (score > 1.001)
@@ -454,7 +454,7 @@ public class LeveledManifest
             }
 
             // leave everything in L0 if we didn't end up with a full sstable's worth of data
-            if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
+            if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes)
             {
                 // add sstables from L1 that overlap candidates
                 // if the overlapping ones are already busy in a compaction, leave it out.
@@ -468,7 +468,7 @@ public class LeveledManifest
         }
 
         // for non-L0 compactions, pick up where we left off last time
-        Collections.sort(generations[level], SSTable.sstableComparator);
+        Collections.sort(generations[level], SSTableReader.sstableComparator);
         int start = 0; // handles case where the prior compaction touched the very last range
         for (int i = 0; i < generations[level].size(); i++)
         {
@@ -499,7 +499,7 @@ public class LeveledManifest
     private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
     {
         List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates);
-        Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator);
+        Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator);
         return ageSortedCandidates;
     }
 
@@ -557,7 +557,7 @@ public class LeveledManifest
         }
 
         int newLevel;
-        if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTable.getTotalBytes(sstables) < maxSSTableSizeInBytes)
+        if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTableReader.getTotalBytes(sstables) < maxSSTableSizeInBytes)
         {
             newLevel = 0;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index b98c2ae..383ff00 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -25,13 +25,6 @@ import com.google.common.base.Throwables;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.AbstractCompactionIterable;
-import org.apache.cassandra.db.compaction.CompactionIterable;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.CompactionTask;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
@@ -63,7 +56,7 @@ public class Upgrader
 
         this.strategy = cfs.getCompactionStrategy();
         long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade, cfs.metadata));
-        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize());
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize());
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 599e0ba..4635251 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -43,10 +43,6 @@ public class Component
         PRIMARY_INDEX("Index.db"),
         // serialized bloom filter for the row keys in the sstable
         FILTER("Filter.db"),
-        // 0-length file that is created when an sstable is ready to be deleted
-        // @deprecated: deletion of compacted file is based on the lineag information stored in the compacted sstabl
-        // metadata. This ensure we can guarantee never using a sstable and some of its parents, even in case of failure.
-        COMPACTED_MARKER("Compacted"),
         // file to hold information about uncompressed data length, chunk offsets etc.
         COMPRESSION_INFO("CompressionInfo.db"),
         // statistical metadata about the content of the sstable
@@ -81,7 +77,6 @@ public class Component
     public final static Component DATA = new Component(Type.DATA);
     public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
     public final static Component FILTER = new Component(Type.FILTER);
-    public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER);
     public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
     public final static Component STATS = new Component(Type.STATS);
     public final static Component DIGEST = new Component(Type.DIGEST);
@@ -133,7 +128,6 @@ public class Component
             case DATA:              component = Component.DATA;                         break;
             case PRIMARY_INDEX:     component = Component.PRIMARY_INDEX;                break;
             case FILTER:            component = Component.FILTER;                       break;
-            case COMPACTED_MARKER:  component = Component.COMPACTED_MARKER;             break;
             case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
             case STATS:             component = Component.STATS;                        break;
             case DIGEST:            component = Component.DIGEST;                       break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index f4f7ee5..0c36f62 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -35,7 +35,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
 
     public KeyIterator(Descriptor desc)
     {
-        File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+        File path = new File(desc.filenameFor(Component.PRIMARY_INDEX));
         in = RandomAccessReader.open(path);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index e469b33..c13c423 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.slf4j.Logger;
@@ -57,27 +56,10 @@ public abstract class SSTable
 {
     static final Logger logger = LoggerFactory.getLogger(SSTable.class);
 
-    // TODO: replace with 'Component' objects
-    public static final String COMPONENT_DATA = Component.Type.DATA.repr;
-    public static final String COMPONENT_INDEX = Component.Type.PRIMARY_INDEX.repr;
-    public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
-    public static final String COMPONENT_STATS = Component.Type.STATS.repr;
-    public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
-
     public static final String TEMPFILE_MARKER = "tmp";
 
     public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
 
-    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
-    {
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            long ts1 = o1.getMaxTimestamp();
-            long ts2 = o2.getMaxTimestamp();
-            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
-        }
-    };
-
     public final Descriptor descriptor;
     protected final Set<Component> components;
     public final CFMetaData metadata;
@@ -101,26 +83,13 @@ public abstract class SSTable
         assert partitioner != null;
 
         this.descriptor = descriptor;
-        Set<Component> dataComponents = new HashSet<Component>(components);
-        for (Component component : components)
-            assert component.type != Component.Type.COMPACTED_MARKER;
-
+        Set<Component> dataComponents = new HashSet<>(components);
         this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
-        this.components = new CopyOnWriteArraySet<Component>(dataComponents);
+        this.components = new CopyOnWriteArraySet<>(dataComponents);
         this.metadata = metadata;
         this.partitioner = partitioner;
     }
 
-    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
-    {
-        public int compare(SSTableReader o1, SSTableReader o2)
-        {
-            return o1.first.compareTo(o2.first);
-        }
-    };
-
-    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
-
     /**
      * We use a ReferenceQueue to manage deleting files that have been compacted
      * and for which no more SSTable references exist.  But this is not guaranteed
@@ -139,15 +108,11 @@ public abstract class SSTable
             FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
         for (Component component : components)
         {
-            if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY))
+            if (component.equals(Component.DATA) || component.equals(Component.SUMMARY))
                 continue;
 
             FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
-        // remove the COMPACTED_MARKER component last if it exists
-        // Note: newly created sstable should not have a marker, but we keep this for now to make sure
-        // we don't leave older marker around
-        FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
         FileUtils.delete(desc.filenameFor(Component.SUMMARY));
 
         logger.debug("Deleted {}", desc);
@@ -167,12 +132,12 @@ public abstract class SSTable
 
     public String getFilename()
     {
-        return descriptor.filenameFor(COMPONENT_DATA);
+        return descriptor.filenameFor(Component.DATA);
     }
 
     public String getIndexFilename()
     {
-        return descriptor.filenameFor(COMPONENT_INDEX);
+        return descriptor.filenameFor(Component.PRIMARY_INDEX);
     }
 
     public String getColumnFamilyName()
@@ -262,16 +227,6 @@ public abstract class SSTable
         return estimatedRows;
     }
 
-    public static long getTotalBytes(Iterable<SSTableReader> sstables)
-    {
-        long sum = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            sum += sstable.onDiskLength();
-        }
-        return sum;
-    }
-
     public long bytesOnDisk()
     {
         long bytes = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index 140e08b..8ddfdd7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -435,7 +435,7 @@ public class SSTableMetadata
         public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
         {
             logger.debug("Load metadata for {}", descriptor);
-            File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
+            File statsFile = new File(descriptor.filenameFor(Component.STATS));
             if (!statsFile.exists())
             {
                 logger.debug("No sstable stats for {}", descriptor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c5b61d9..7fd9ca6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
@@ -67,8 +68,28 @@ public class SSTableReader extends SSTable implements Closeable
     private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
     private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 
+    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            long ts1 = o1.getMaxTimestamp();
+            long ts2 = o2.getMaxTimestamp();
+            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+        }
+    };
+
+    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return o1.first.compareTo(o2.first);
+        }
+    };
+
+    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+
     /**
-     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound
+     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
      * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
      * later than maxDataAge.
      *
@@ -168,7 +189,7 @@ public class SSTableReader extends SSTable implements Closeable
         SegmentedFile.Builder dbuilder = sstable.compression
                                        ? new CompressedSegmentedFile.Builder()
                                        : new BufferedSegmentedFile.Builder();
-        if (!loadSummary(sstable, ibuilder, dbuilder, sstable.metadata))
+        if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
             sstable.buildSummary(false, ibuilder, dbuilder, false);
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
@@ -218,7 +239,7 @@ public class SSTableReader extends SSTable implements Closeable
         assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 
-        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 
         SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
 
@@ -247,7 +268,7 @@ public class SSTableReader extends SSTable implements Closeable
                                                       final CFMetaData metadata,
                                                       final IPartitioner partitioner)
     {
-        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
+        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 
         ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
         for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
@@ -365,6 +386,16 @@ public class SSTableReader extends SSTable implements Closeable
         this.bf = bloomFilter;
     }
 
+    public static long getTotalBytes(Iterable<SSTableReader> sstables)
+    {
+        long sum = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            sum += sstable.onDiskLength();
+        }
+        return sum;
+    }
+
     /**
      * Clean up all opened resources.
      *
@@ -441,18 +472,18 @@ public class SSTableReader extends SSTable implements Closeable
                                          ? SegmentedFile.getCompressedBuilder()
                                          : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
-        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
+        boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
         if (recreateBloomFilter || !summaryLoaded)
             buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
 
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
         if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
-            saveSummary(this, ibuilder, dbuilder);
+            saveSummary(ibuilder, dbuilder);
     }
 
-     private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
-     {
+    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+    {
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
         RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 
@@ -505,27 +536,27 @@ public class SSTableReader extends SSTable implements Closeable
         last = getMinimalKey(last);
     }
 
-    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
+    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
     {
-        File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
-        if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists())
+        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+        if (!descriptor.version.offHeapSummaries || !summariesFile.exists())
             return false;
 
         DataInputStream iStream = null;
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            reader.indexSummary = IndexSummary.serializer.deserialize(iStream, reader.partitioner);
-            if (reader.indexSummary.getIndexInterval() != metadata.getIndexInterval())
+            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner);
+            if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
             {
                 iStream.close();
                 logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.",
-                             reader.toString(), reader.indexSummary.getIndexInterval(), metadata.getIndexInterval());
+                             toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval());
                 FileUtils.deleteWithConfirm(summariesFile);
                 return false;
             }
-            reader.first = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
-            reader.last = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
             dbuilder.deserializeBounds(iStream);
         }
@@ -544,9 +575,9 @@ public class SSTableReader extends SSTable implements Closeable
         return true;
     }
 
-    public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
-        File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (summariesFile.exists())
             summariesFile.delete();
 
@@ -554,9 +585,9 @@ public class SSTableReader extends SSTable implements Closeable
         try
         {
             oStream = new DataOutputStream(new FileOutputStream(summariesFile));
-            IndexSummary.serializer.serialize(reader.indexSummary, oStream);
-            ByteBufferUtil.writeWithLength(reader.first.key, oStream);
-            ByteBufferUtil.writeWithLength(reader.last.key, oStream);
+            IndexSummary.serializer.serialize(indexSummary, oStream);
+            ByteBufferUtil.writeWithLength(first.key, oStream);
+            ByteBufferUtil.writeWithLength(last.key, oStream);
             ibuilder.serializeBounds(oStream);
             dbuilder.serializeBounds(oStream);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 70c0b42..b5d50cf 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -313,8 +313,8 @@ public class SSTableWriter extends SSTable
         SSTableMetadata sstableMetadata = p.right;
 
         // finalize in-memory state for the reader
-        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
-        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
+        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
         SSTableReader sstable = SSTableReader.internalOpen(newdesc,
                                                            components,
                                                            metadata,
@@ -328,7 +328,7 @@ public class SSTableWriter extends SSTable
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
         // try to save the summaries to disk
-        SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder);
+        sstable.saveSummary(iwriter.builder, dbuilder);
         iwriter = null;
         dbuilder = null;
         return sstable;
@@ -355,7 +355,7 @@ public class SSTableWriter extends SSTable
 
     private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata,  Set<Integer> ancestors)
     {
-        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
+        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
         try
         {
             SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream);
@@ -411,7 +411,7 @@ public class SSTableWriter extends SSTable
 
         IndexWriter(long keyCount)
         {
-            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)),
+            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
                                               !metadata.populateIoCacheOnFlush());
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
             summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval());
@@ -446,7 +446,7 @@ public class SSTableWriter extends SSTable
         {
             if (components.contains(Component.FILTER))
             {
-                String path = descriptor.filenameFor(SSTable.COMPONENT_FILTER);
+                String path = descriptor.filenameFor(Component.FILTER);
                 try
                 {
                     // bloom filter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index f334d08..0606941 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -145,10 +145,10 @@ public class DataIntegrityMetadata
             byte[] bytes = digest.digest();
             if (bytes == null)
                 return;
-            SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true);
+            SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(Component.DIGEST)), true);
             // Writting output compatible with sha1sum
             Descriptor newdesc = descriptor.asTemporary(false);
-            String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
+            String[] tmp = newdesc.filenameFor(Component.DATA).split(Pattern.quote(File.separator));
             String dataFileName = tmp[tmp.length - 1];
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index c6b6eb0..0eb44d0 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -106,7 +105,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
         {
             List<SSTableReader> sstables = manifest.getLevel(level);
             // score check
-            assert (double) SSTable.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00;
+            assert (double) SSTableReader.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00;
             // overlap check for levels greater than 0
             if (level > 0)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 32bc7df..7f7d5c9 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -938,7 +938,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         cfs.clearUnsafe();
         assertEquals(0, cfs.getSSTables().size());
 
-        new File(ssTables.iterator().next().descriptor.filenameFor(SSTable.COMPONENT_STATS)).delete();
+        new File(ssTables.iterator().next().descriptor.filenameFor(Component.STATS)).delete();
         cfs.loadNewSSTables();
 
         // Add another column with a lower timestamp

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index b60f6d9..a008de1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.UUID;
 
@@ -39,7 +37,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
@@ -145,7 +142,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
             scanner.next();
 
         // scanner.getCurrentPosition should be equal to total bytes of L1 sstables
-        assert scanner.getCurrentPosition() == SSTable.getTotalBytes(sstables);
+        assert scanner.getCurrentPosition() == SSTableReader.getTotalBytes(sstables);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 36d8fbe..b771e72 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -278,7 +278,7 @@ public class SSTableReaderTest extends SchemaLoader
         SegmentedFile.Builder dbuilder = sstable.compression
                                           ? SegmentedFile.getCompressedBuilder()
                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-        SSTableReader.saveSummary(sstable, ibuilder, dbuilder);
+        sstable.saveSummary(ibuilder, dbuilder);
 
         SSTableReader reopened = SSTableReader.open(sstable.descriptor);
         assert reopened.first.token instanceof LocalToken;