You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:50:01 UTC

[08/11] cassandra git commit: Remove pre-3.0 compatibility code for 4.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4c0608f..5baf783 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -509,16 +509,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         {
             public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
             {
-                if (version >= MessagingService.VERSION_30)
-                    out.writeByte(expression.kind().ordinal());
+                out.writeByte(expression.kind().ordinal());
 
                 // Custom expressions include neither a column or operator, but all
-                // other expressions do. Also, custom expressions are 3.0+ only, so
-                // the column & operator will always be the first things written for
-                // any pre-3.0 version
+                // other expressions do.
                 if (expression.kind() == Kind.CUSTOM)
                 {
-                    assert version >= MessagingService.VERSION_30;
                     IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version);
                     ByteBufferUtil.writeWithShortLength(expression.value, out);
                     return;
@@ -526,7 +522,6 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
 
                 if (expression.kind() == Kind.USER)
                 {
-                    assert version >= MessagingService.VERSION_30;
                     UserExpression.serialize((UserExpression)expression, out, version);
                     return;
                 }
@@ -541,15 +536,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                         break;
                     case MAP_EQUALITY:
                         MapEqualityExpression mexpr = (MapEqualityExpression)expression;
-                        if (version < MessagingService.VERSION_30)
-                        {
-                            ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
-                        }
-                        else
-                        {
-                            ByteBufferUtil.writeWithShortLength(mexpr.key, out);
-                            ByteBufferUtil.writeWithShortLength(mexpr.value, out);
-                        }
+                        ByteBufferUtil.writeWithShortLength(mexpr.key, out);
+                        ByteBufferUtil.writeWithShortLength(mexpr.value, out);
                         break;
                     case THRIFT_DYN_EXPR:
                         ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
@@ -559,62 +547,33 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
 
             public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
             {
-                Kind kind = null;
-                ByteBuffer name;
-                Operator operator;
-                ColumnDefinition column;
+                Kind kind = Kind.values()[in.readByte()];
 
-                if (version >= MessagingService.VERSION_30)
+                // custom expressions (3.0+ only) do not contain a column or operator, only a value
+                if (kind == Kind.CUSTOM)
                 {
-                    kind = Kind.values()[in.readByte()];
-                    // custom expressions (3.0+ only) do not contain a column or operator, only a value
-                    if (kind == Kind.CUSTOM)
-                    {
-                        return new CustomExpression(metadata,
-                                                    IndexMetadata.serializer.deserialize(in, version, metadata),
-                                                    ByteBufferUtil.readWithShortLength(in));
-                    }
-
-                    if (kind == Kind.USER)
-                    {
-                        return UserExpression.deserialize(in, version, metadata);
-                    }
+                    return new CustomExpression(metadata,
+                            IndexMetadata.serializer.deserialize(in, version, metadata),
+                            ByteBufferUtil.readWithShortLength(in));
                 }
 
-                name = ByteBufferUtil.readWithShortLength(in);
-                operator = Operator.readFrom(in);
-                column = metadata.getColumnDefinition(name);
+                if (kind == Kind.USER)
+                    return UserExpression.deserialize(in, version, metadata);
+
+                ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+                Operator operator = Operator.readFrom(in);
+                ColumnDefinition column = metadata.getColumnDefinition(name);
+
                 if (!metadata.isCompactTable() && column == null)
                     throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");
 
-                if (version < MessagingService.VERSION_30)
-                {
-                    if (column == null)
-                        kind = Kind.THRIFT_DYN_EXPR;
-                    else if (column.type instanceof MapType && operator == Operator.EQ)
-                        kind = Kind.MAP_EQUALITY;
-                    else
-                        kind = Kind.SIMPLE;
-                }
-
-                assert kind != null;
                 switch (kind)
                 {
                     case SIMPLE:
                         return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
                     case MAP_EQUALITY:
-                        ByteBuffer key, value;
-                        if (version < MessagingService.VERSION_30)
-                        {
-                            ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
-                            key = CompositeType.extractComponent(composite, 0);
-                            value = CompositeType.extractComponent(composite, 0);
-                        }
-                        else
-                        {
-                            key = ByteBufferUtil.readWithShortLength(in);
-                            value = ByteBufferUtil.readWithShortLength(in);
-                        }
+                        ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+                        ByteBuffer value = ByteBufferUtil.readWithShortLength(in);
                         return new MapEqualityExpression(column, key, operator, value);
                     case THRIFT_DYN_EXPR:
                         return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
@@ -622,16 +581,12 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                 throw new AssertionError();
             }
 
-
             public long serializedSize(Expression expression, int version)
             {
-                // version 3.0+ includes a byte for Kind
-                long size = version >= MessagingService.VERSION_30 ? 1 : 0;
+                long size = 1; // kind byte
 
                 // Custom expressions include neither a column or operator, but all
-                // other expressions do. Also, custom expressions are 3.0+ only, so
-                // the column & operator will always be the first things written for
-                // any pre-3.0 version
+                // other expressions do.
                 if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER)
                     size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
                             + expression.operator.serializedSize();
@@ -643,23 +598,19 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                         break;
                     case MAP_EQUALITY:
                         MapEqualityExpression mexpr = (MapEqualityExpression)expression;
-                        if (version < MessagingService.VERSION_30)
-                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue());
-                        else
-                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
-                                  + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
+                        size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
+                              + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
                         break;
                     case THRIFT_DYN_EXPR:
                         size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value);
                         break;
                     case CUSTOM:
-                        if (version >= MessagingService.VERSION_30)
-                            size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
-                                   + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
+                        size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
+                               + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
                         break;
                     case USER:
-                        if (version >= MessagingService.VERSION_30)
-                            size += UserExpression.serializedSize((UserExpression)expression, version);
+                        size += UserExpression.serializedSize((UserExpression)expression, version);
+                        break;
                 }
                 return size;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index b95a310..1ed961f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -238,12 +238,10 @@ public class PartitionUpdate extends AbstractBTreePartition
      *
      * @param bytes the byte buffer that contains the serialized update.
      * @param version the version with which the update is serialized.
-     * @param key the partition key for the update. This is only used if {@code version &lt 3.0}
-     * and can be {@code null} otherwise.
      *
      * @return the deserialized update or {@code null} if {@code bytes == null}.
      */
-    public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key)
+    public static PartitionUpdate fromBytes(ByteBuffer bytes, int version)
     {
         if (bytes == null)
             return null;
@@ -252,8 +250,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         {
             return serializer.deserialize(new DataInputBuffer(bytes, true),
                                           version,
-                                          SerializationHelper.Flag.LOCAL,
-                                          version < MessagingService.VERSION_30 ? key : null);
+                                          SerializationHelper.Flag.LOCAL);
         }
         catch (IOException e)
         {
@@ -780,47 +777,12 @@ public class PartitionUpdate extends AbstractBTreePartition
             {
                 assert !iter.isReverseOrder();
 
-                if (version < MessagingService.VERSION_30)
-                {
-                    LegacyLayout.serializeAsLegacyPartition(null, iter, out, version);
-                }
-                else
-                {
-                    CFMetaData.serializer.serialize(update.metadata(), out, version);
-                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount());
-                }
+                CFMetaData.serializer.serialize(update.metadata(), out, version);
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount());
             }
         }
 
-        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
-        {
-            if (version >= MessagingService.VERSION_30)
-            {
-                assert key == null; // key is only there for the old format
-                return deserialize30(in, version, flag);
-            }
-            else
-            {
-                assert key != null;
-                return deserializePre30(in, version, flag, key);
-            }
-        }
-
-        // Used to share same decorated key between updates.
-        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
-        {
-            if (version >= MessagingService.VERSION_30)
-            {
-                return deserialize30(in, version, flag);
-            }
-            else
-            {
-                assert key != null;
-                return deserializePre30(in, version, flag, key.getKey());
-            }
-        }
-
-        private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+        public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
         {
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
             UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag);
@@ -854,22 +816,10 @@ public class PartitionUpdate extends AbstractBTreePartition
                                        false);
         }
 
-        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
-        {
-            try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
-            {
-                assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
-                return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata()));
-            }
-        }
-
         public long serializedSize(PartitionUpdate update, int version)
         {
             try (UnfilteredRowIterator iter = update.unfilteredIterator())
             {
-                if (version < MessagingService.VERSION_30)
-                    return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version);
-
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 852d95e..bcc8d4d 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -252,13 +252,11 @@ public abstract class UnfilteredPartitionIterators
     /**
      * Digests the the provided iterator.
      *
-     * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
-     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
+    public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
     {
         try (UnfilteredPartitionIterator iter = iterator)
         {
@@ -266,7 +264,7 @@ public abstract class UnfilteredPartitionIterators
             {
                 try (UnfilteredRowIterator partition = iter.next())
                 {
-                    UnfilteredRowIterators.digest(command, partition, digest, version);
+                    UnfilteredRowIterators.digest(partition, digest, version);
                 }
             }
         }
@@ -303,8 +301,6 @@ public abstract class UnfilteredPartitionIterators
     {
         public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
         {
-            assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
-
             out.writeBoolean(iter.isForThrift());
             while (iter.hasNext())
             {
@@ -319,7 +315,6 @@ public abstract class UnfilteredPartitionIterators
 
         public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
         {
-            assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
             final boolean isForThrift = in.readBoolean();
 
             return new AbstractUnfilteredPartitionIterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
index 14730ac..30a0a37 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java
@@ -211,12 +211,11 @@ public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilt
 
     /**
      * @return true if we can use the clustering values in the stats of the sstable:
-     * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size)
-     * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
+     * we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones
      */
     private boolean canUseMetadataLowerBound()
     {
-        return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile();
+        return !sstable.hasTombstones();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 46447ec..004783e 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -143,20 +143,12 @@ public abstract class UnfilteredRowIterators
     /**
      * Digests the partition represented by the provided iterator.
      *
-     * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
-     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
+    public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version)
     {
-        if (version < MessagingService.VERSION_30)
-        {
-            LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
-            return;
-        }
-
         digest.update(iterator.partitionKey().getKey().duplicate());
         iterator.partitionLevelDeletion().digest(digest);
         iterator.columns().regulars.digest(digest);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index 298c316..7a603b0 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -184,6 +184,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
              * The first int tells us if it's a range or bounds (depending on the value) _and_ if it's tokens or keys (depending on the
              * sign). We use negative kind for keys so as to preserve the serialization of token from older version.
              */
+            // !WARNING! While we don't support the pre-3.0 messaging protocol, we serialize the token range in the
+            // system table (see SystemKeypsace.rangeToBytes) using the old/pre-3.0 format and until we deal with that
+            // problem, we have to preserve this code.
             if (version < MessagingService.VERSION_30)
                 out.writeInt(kindInt(range));
             else
@@ -195,6 +198,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
         public AbstractBounds<T> deserialize(DataInput in, IPartitioner p, int version) throws IOException
         {
             boolean isToken, startInclusive, endInclusive;
+            // !WARNING! See serialize method above for why we still need to have that condition.
             if (version < MessagingService.VERSION_30)
             {
                 int kind = in.readInt();
@@ -226,6 +230,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
 
         public long serializedSize(AbstractBounds<T> ab, int version)
         {
+            // !WARNING! See serialize method above for why we still need to have that condition.
             int size = version < MessagingService.VERSION_30
                      ? TypeSizes.sizeof(kindInt(ab))
                      : 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 212f88c..c4c3872 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -979,12 +979,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private void markAlive(final InetAddress addr, final EndpointState localState)
     {
-        if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20)
-        {
-            realMarkAlive(addr, localState);
-            return;
-        }
-
         localState.markDead();
 
         MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
deleted file mode 100644
index 50d8b6e..0000000
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hints;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * A migrator that goes through the legacy system.hints table and writes all the hints to the new hints storage format.
- */
-@SuppressWarnings("deprecation")
-public final class LegacyHintsMigrator
-{
-    private static final Logger logger = LoggerFactory.getLogger(LegacyHintsMigrator.class);
-
-    private final File hintsDirectory;
-    private final long maxHintsFileSize;
-
-    private final ColumnFamilyStore legacyHintsTable;
-    private final int pageSize;
-
-    public LegacyHintsMigrator(File hintsDirectory, long maxHintsFileSize)
-    {
-        this.hintsDirectory = hintsDirectory;
-        this.maxHintsFileSize = maxHintsFileSize;
-
-        legacyHintsTable = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
-        pageSize = calculatePageSize(legacyHintsTable);
-    }
-
-    // read fewer columns (mutations) per page if they are very large
-    private static int calculatePageSize(ColumnFamilyStore legacyHintsTable)
-    {
-        int size = 128;
-
-        int meanCellCount = legacyHintsTable.getMeanColumns();
-        double meanPartitionSize = legacyHintsTable.getMeanPartitionSize();
-
-        if (meanCellCount != 0 && meanPartitionSize != 0)
-        {
-            int avgHintSize = (int) meanPartitionSize / meanCellCount;
-            size = Math.max(2, Math.min(size, (512 << 10) / avgHintSize));
-        }
-
-        return size;
-    }
-
-    public void migrate()
-    {
-        // nothing to migrate
-        if (legacyHintsTable.isEmpty())
-            return;
-        logger.info("Migrating legacy hints to new storage");
-
-        // major-compact all of the existing sstables to get rid of the tombstones + expired hints
-        logger.info("Forcing a major compaction of {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
-        compactLegacyHints();
-
-        // paginate over legacy hints and write them to the new storage
-        logger.info("Writing legacy hints to the new storage");
-        migrateLegacyHints();
-
-        // truncate the legacy hints table
-        logger.info("Truncating {}.{} table", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
-        legacyHintsTable.truncateBlocking();
-    }
-
-    private void compactLegacyHints()
-    {
-        Collection<Descriptor> descriptors = new ArrayList<>();
-        legacyHintsTable.getTracker().getUncompacting().forEach(sstable -> descriptors.add(sstable.descriptor));
-        if (!descriptors.isEmpty())
-            forceCompaction(descriptors);
-    }
-
-    private void forceCompaction(Collection<Descriptor> descriptors)
-    {
-        try
-        {
-            CompactionManager.instance.submitUserDefined(legacyHintsTable, descriptors, FBUtilities.nowInSeconds()).get();
-        }
-        catch (InterruptedException | ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void migrateLegacyHints()
-    {
-        ByteBuffer buffer = ByteBuffer.allocateDirect(256 * 1024);
-        String query = String.format("SELECT DISTINCT target_id FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_HINTS);
-        //noinspection ConstantConditions
-        QueryProcessor.executeInternal(query).forEach(row -> migrateLegacyHints(row.getUUID("target_id"), buffer));
-        FileUtils.clean(buffer);
-    }
-
-    private void migrateLegacyHints(UUID hostId, ByteBuffer buffer)
-    {
-        String query = String.format("SELECT target_id, hint_id, message_version, mutation, ttl(mutation) AS ttl, writeTime(mutation) AS write_time " +
-                                     "FROM %s.%s " +
-                                     "WHERE target_id = ?",
-                                     SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                                     SystemKeyspace.LEGACY_HINTS);
-
-        // read all the old hints (paged iterator), write them in the new format
-        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize, hostId);
-        migrateLegacyHints(hostId, rows, buffer);
-
-        // delete the whole partition in the legacy table; we would truncate the whole table afterwards, but this allows
-        // to not lose progress in case of a terminated conversion
-        deleteLegacyHintsPartition(hostId);
-    }
-
-    private void migrateLegacyHints(UUID hostId, UntypedResultSet rows, ByteBuffer buffer)
-    {
-        migrateLegacyHints(hostId, rows.iterator(), buffer);
-    }
-
-    private void migrateLegacyHints(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
-    {
-        do
-        {
-            migrateLegacyHintsInternal(hostId, iterator, buffer);
-            // if there are hints that didn't fit in the previous file, keep calling the method to write to a new
-            // file until we get everything written.
-        }
-        while (iterator.hasNext());
-    }
-
-    private void migrateLegacyHintsInternal(UUID hostId, Iterator<UntypedResultSet.Row> iterator, ByteBuffer buffer)
-    {
-        HintsDescriptor descriptor = new HintsDescriptor(hostId, System.currentTimeMillis());
-
-        try (HintsWriter writer = HintsWriter.create(hintsDirectory, descriptor))
-        {
-            try (HintsWriter.Session session = writer.newSession(buffer))
-            {
-                while (iterator.hasNext())
-                {
-                    Hint hint = convertLegacyHint(iterator.next());
-                    if (hint != null)
-                        session.append(hint);
-
-                    if (session.position() >= maxHintsFileSize)
-                        break;
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, descriptor.fileName());
-        }
-    }
-
-    private static Hint convertLegacyHint(UntypedResultSet.Row row)
-    {
-        Mutation mutation = deserializeLegacyMutation(row);
-        if (mutation == null)
-            return null;
-
-        long creationTime = row.getLong("write_time"); // milliseconds, not micros, for the hints table
-        int expirationTime = FBUtilities.nowInSeconds() + row.getInt("ttl");
-        int originalGCGS = expirationTime - (int) TimeUnit.MILLISECONDS.toSeconds(creationTime);
-
-        int gcgs = Math.min(originalGCGS, mutation.smallestGCGS());
-
-        return Hint.create(mutation, creationTime, gcgs);
-    }
-
-    private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
-    {
-        try (DataInputBuffer dib = new DataInputBuffer(row.getBlob("mutation"), true))
-        {
-            Mutation mutation = Mutation.serializer.deserialize(dib,
-                                                                row.getInt("message_version"));
-            mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
-            return mutation;
-        }
-        catch (IOException e)
-        {
-            logger.error("Failed to migrate a hint for {} from legacy {}.{} table",
-                         row.getUUID("target_id"),
-                         SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                         SystemKeyspace.LEGACY_HINTS,
-                         e);
-            return null;
-        }
-        catch (MarshalException e)
-        {
-            logger.warn("Failed to validate a hint for {} from legacy {}.{} table - skipping",
-                        row.getUUID("target_id"),
-                        SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                        SystemKeyspace.LEGACY_HINTS,
-                        e);
-            return null;
-        }
-    }
-
-    private static void deleteLegacyHintsPartition(UUID hostId)
-    {
-        // intentionally use millis, like the rest of the legacy implementation did, just in case
-        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyHints,
-                                                                             UUIDType.instance.decompose(hostId),
-                                                                             System.currentTimeMillis(),
-                                                                             FBUtilities.nowInSeconds()));
-        mutation.applyUnsafe();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
deleted file mode 100644
index 64f91d7..0000000
--- a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-/**
- * A serializer which forwards all its method calls to another serializer. Subclasses should override one or more
- * methods to modify the behavior of the backing serializer as desired per the decorator pattern.
- */
-public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T>
-{
-    protected ForwardingVersionedSerializer()
-    {
-    }
-
-    /**
-     * Returns the backing delegate instance that methods are forwarded to.
-     *
-     * @param version the server version
-     * @return the backing delegate instance that methods are forwarded to.
-     */
-    protected abstract IVersionedSerializer<T> delegate(int version);
-
-    public void serialize(T t, DataOutputPlus out, int version) throws IOException
-    {
-        delegate(version).serialize(t, out, version);
-    }
-
-    public T deserialize(DataInputPlus in, int version) throws IOException
-    {
-        return delegate(version).deserialize(in, version);
-    }
-
-    public long serializedSize(T t, int version)
-    {
-        return delegate(version).serializedSize(t, version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9a4d919..7d98570 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -71,7 +71,6 @@ public class CompressionMetadata
     private final long chunkOffsetsSize;
     public final String indexFilePath;
     public final CompressionParams parameters;
-    public final ChecksumType checksumType;
 
     /**
      * Create metadata about given compressed file including uncompressed data length, chunk size
@@ -87,14 +86,13 @@ public class CompressionMetadata
     public static CompressionMetadata create(String dataFilePath)
     {
         Descriptor desc = Descriptor.fromFilename(dataFilePath);
-        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.compressedChecksumType());
+        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
     }
 
     @VisibleForTesting
-    public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
+    public CompressionMetadata(String indexFilePath, long compressedLength)
     {
         this.indexFilePath = indexFilePath;
-        this.checksumType = checksumType;
 
         try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
         {
@@ -133,7 +131,7 @@ public class CompressionMetadata
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 
-    private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, ChecksumType checksumType)
+    private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength)
     {
         this.indexFilePath = filePath;
         this.parameters = parameters;
@@ -141,7 +139,6 @@ public class CompressionMetadata
         this.compressedFileLength = compressedLength;
         this.chunkOffsets = offsets;
         this.chunkOffsetsSize = offsetsSize;
-        this.checksumType = checksumType;
     }
 
     public ICompressor compressor()
@@ -417,7 +414,7 @@ public class CompressionMetadata
             if (count < this.count)
                 compressedLength = offsets.getLong(count * 8L);
 
-            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, ChecksumType.CRC32);
+            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 9a8f968..7efca63 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -18,7 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.FilenameFilter;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
@@ -90,12 +90,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
     private static int getNextGeneration(File directory, final String columnFamily)
     {
         final Set<Descriptor> existing = new HashSet<>();
-        directory.list(new FilenameFilter()
+        directory.listFiles(new FileFilter()
         {
-            public boolean accept(File dir, String name)
+            public boolean accept(File file)
             {
-                Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
-                Descriptor desc = p == null ? null : p.left;
+                Descriptor desc = SSTable.tryDescriptorFromFilename(file);
                 if (desc == null)
                     return false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 38152af..469a25c 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -50,8 +50,8 @@ public class Component
         COMPRESSION_INFO("CompressionInfo.db"),
         // statistical metadata about the content of the sstable
         STATS("Statistics.db"),
-        // holds adler32 checksum of the data file
-        DIGEST("Digest.crc32", "Digest.adler32", "Digest.sha1"),
+        // holds CRC32 checksum of the data file
+        DIGEST("Digest.crc32"),
         // holds the CRC32 for chunks in an a uncompressed file.
         CRC("CRC.db"),
         // holds SSTable Index Summary (sampling of Index component)
@@ -61,16 +61,11 @@ public class Component
         // built-in secondary index (may be multiple per sstable)
         SECONDARY_INDEX("SI_.*.db"),
         // custom component, used by e.g. custom compaction strategy
-        CUSTOM(new String[] { null });
+        CUSTOM(null);
         
-        final String[] repr;
+        final String repr;
         Type(String repr)
         {
-            this(new String[] { repr });
-        }
-
-        Type(String... repr)
-        {
             this.repr = repr;
         }
 
@@ -78,9 +73,7 @@ public class Component
         {
             for (Type type : TYPES)
             {
-                if (type.repr == null || type.repr.length == 0 || type.repr[0] == null)
-                    continue;
-                if (Pattern.matches(type.repr[0], repr))
+                if (type.repr != null && Pattern.matches(type.repr, repr))
                     return type;
             }
             return CUSTOM;
@@ -93,36 +86,18 @@ public class Component
     public final static Component FILTER = new Component(Type.FILTER);
     public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
     public final static Component STATS = new Component(Type.STATS);
-    private static final String digestCrc32 = "Digest.crc32";
-    private static final String digestAdler32 = "Digest.adler32";
-    private static final String digestSha1 = "Digest.sha1";
-    public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, digestCrc32);
-    public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, digestAdler32);
-    public final static Component DIGEST_SHA1 = new Component(Type.DIGEST, digestSha1);
+    public final static Component DIGEST = new Component(Type.DIGEST);
     public final static Component CRC = new Component(Type.CRC);
     public final static Component SUMMARY = new Component(Type.SUMMARY);
     public final static Component TOC = new Component(Type.TOC);
 
-    public static Component digestFor(ChecksumType checksumType)
-    {
-        switch (checksumType)
-        {
-            case Adler32:
-                return DIGEST_ADLER32;
-            case CRC32:
-                return DIGEST_CRC32;
-        }
-        throw new AssertionError();
-    }
-
     public final Type type;
     public final String name;
     public final int hashCode;
 
     public Component(Type type)
     {
-        this(type, type.repr[0]);
-        assert type.repr.length == 1;
+        this(type, type.repr);
         assert type != Type.CUSTOM;
     }
 
@@ -143,45 +118,32 @@ public class Component
     }
 
     /**
-     * {@code
-     * Filename of the form "<ksname>/<cfname>-[tmp-][<version>-]<gen>-<component>",
-     * }
-     * @return A Descriptor for the SSTable, and a Component for this particular file.
-     * TODO move descriptor into Component field
+     * Parse the component part of a sstable filename into a {@code Component} object.
+     *
+     * @param name a string representing a sstable component.
+     * @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized
+     * name is parsed into a CUSTOM component.
      */
-    public static Pair<Descriptor,Component> fromFilename(File directory, String name)
+    static Component parse(String name)
     {
-        Pair<Descriptor,String> path = Descriptor.fromFilename(directory, name);
+        Type type = Type.fromRepresentation(name);
 
-        // parse the component suffix
-        Type type = Type.fromRepresentation(path.right);
-        // build (or retrieve singleton for) the component object
-        Component component;
-        switch(type)
+        // Build (or retrieve singleton for) the component object
+        switch (type)
         {
-            case DATA:              component = Component.DATA;                         break;
-            case PRIMARY_INDEX:     component = Component.PRIMARY_INDEX;                break;
-            case FILTER:            component = Component.FILTER;                       break;
-            case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
-            case STATS:             component = Component.STATS;                        break;
-            case DIGEST:            switch (path.right)
-                                    {
-                                        case digestCrc32:   component = Component.DIGEST_CRC32;     break;
-                                        case digestAdler32: component = Component.DIGEST_ADLER32;   break;
-                                        case digestSha1:    component = Component.DIGEST_SHA1;      break;
-                                        default:            throw new IllegalArgumentException("Invalid digest component " + path.right);
-                                    }
-                                    break;
-            case CRC:               component = Component.CRC;                          break;
-            case SUMMARY:           component = Component.SUMMARY;                      break;
-            case TOC:               component = Component.TOC;                          break;
-            case SECONDARY_INDEX:   component = new Component(Type.SECONDARY_INDEX, path.right); break;
-            case CUSTOM:            component = new Component(Type.CUSTOM, path.right); break;
-            default:
-                 throw new IllegalStateException();
+            case DATA:             return Component.DATA;
+            case PRIMARY_INDEX:    return Component.PRIMARY_INDEX;
+            case FILTER:           return Component.FILTER;
+            case COMPRESSION_INFO: return Component.COMPRESSION_INFO;
+            case STATS:            return Component.STATS;
+            case DIGEST:           return Component.DIGEST;
+            case CRC:              return Component.CRC;
+            case SUMMARY:          return Component.SUMMARY;
+            case TOC:              return Component.TOC;
+            case SECONDARY_INDEX:  return new Component(Type.SECONDARY_INDEX, name);
+            case CUSTOM:           return new Component(Type.CUSTOM, name);
+            default:               throw new AssertionError();
         }
-
-        return Pair.create(path.left, component);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 1f7e67f..3804fd8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -26,12 +26,12 @@ import java.util.regex.Pattern;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Objects;
+import com.google.common.base.Splitter;
 
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
-import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
 
@@ -46,8 +46,13 @@ import static org.apache.cassandra.io.sstable.Component.separator;
  */
 public class Descriptor
 {
+    private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$";
+    private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR);
+
     public static String TMP_EXT = ".tmp";
 
+    private static final Splitter filenameSplitter = Splitter.on('-');
+
     /** canonicalized path to the directory where SSTable resides */
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
@@ -56,8 +61,6 @@ public class Descriptor
     public final String cfname;
     public final int generation;
     public final SSTableFormat.Type formatType;
-    /** digest component - might be {@code null} for old, legacy sstables */
-    public final Component digestComponent;
     private final int hashCode;
 
     /**
@@ -66,7 +69,7 @@ public class Descriptor
     @VisibleForTesting
     public Descriptor(File directory, String ksname, String cfname, int generation)
     {
-        this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current(), null);
+        this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current());
     }
 
     /**
@@ -74,16 +77,10 @@ public class Descriptor
      */
     public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
-        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
-    }
-
-    @VisibleForTesting
-    public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
-    {
-        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
+        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType);
     }
 
-    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)
+    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
         assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
         this.version = version;
@@ -99,24 +96,18 @@ public class Descriptor
         this.cfname = cfname;
         this.generation = generation;
         this.formatType = formatType;
-        this.digestComponent = digestComponent;
 
         hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType);
     }
 
     public Descriptor withGeneration(int newGeneration)
     {
-        return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType, digestComponent);
+        return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType);
     }
 
     public Descriptor withFormatType(SSTableFormat.Type newType)
     {
-        return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType, digestComponent);
-    }
-
-    public Descriptor withDigestComponent(Component newDigestComponent)
-    {
-        return new Descriptor(version, directory, ksname, cfname, generation, formatType, newDigestComponent);
+        return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType);
     }
 
     public String tmpFilenameFor(Component component)
@@ -139,15 +130,9 @@ public class Descriptor
 
     private void appendFileName(StringBuilder buff)
     {
-        if (!version.hasNewFileName())
-        {
-            buff.append(ksname).append(separator);
-            buff.append(cfname).append(separator);
-        }
         buff.append(version).append(separator);
         buff.append(generation);
-        if (formatType != SSTableFormat.Type.LEGACY)
-            buff.append(separator).append(formatType.name);
+        buff.append(separator).append(formatType.name);
     }
 
     public String relativeFilenameFor(Component component)
@@ -176,155 +161,156 @@ public class Descriptor
         return ret;
     }
 
-    /**
-     *  Files obsoleted by CASSANDRA-7066 : temporary files and compactions_in_progress. We support
-     *  versions 2.1 (ka) and 2.2 (la).
-     *  Temporary files have tmp- or tmplink- at the beginning for 2.2 sstables or after ks-cf- for 2.1 sstables
-     */
-
-    private final static String LEGACY_COMP_IN_PROG_REGEX_STR = "^compactions_in_progress(\\-[\\d,a-f]{32})?$";
-    private final static Pattern LEGACY_COMP_IN_PROG_REGEX = Pattern.compile(LEGACY_COMP_IN_PROG_REGEX_STR);
-    private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$";
-    private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR);
-
-    public static boolean isLegacyFile(File file)
+    public static boolean isValidFile(File file)
     {
-        if (file.isDirectory())
-            return file.getParentFile() != null &&
-                   file.getParentFile().getName().equalsIgnoreCase("system") &&
-                   LEGACY_COMP_IN_PROG_REGEX.matcher(file.getName()).matches();
-        else
-            return LEGACY_TMP_REGEX.matcher(file.getName()).matches();
-    }
-
-    public static boolean isValidFile(String fileName)
-    {
-        return fileName.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(fileName).matches();
+        String filename = file.getName();
+        return filename.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(filename).matches();
     }
 
     /**
-     * @see #fromFilename(File directory, String name)
-     * @param filename The SSTable filename
-     * @return Descriptor of the SSTable initialized from filename
+     * Parse a sstable filename into a Descriptor.
+     * <p>
+     * This is a shortcut for {@code fromFilename(new File(filename))}.
+     *
+     * @param filename the filename to a sstable component.
+     * @return the descriptor for the parsed file.
+     *
+     * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+     * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+     * versions.
      */
     public static Descriptor fromFilename(String filename)
     {
-        return fromFilename(filename, false);
-    }
-
-    public static Descriptor fromFilename(String filename, SSTableFormat.Type formatType)
-    {
-        return fromFilename(filename).withFormatType(formatType);
+        return fromFilename(new File(filename));
     }
 
-    public static Descriptor fromFilename(String filename, boolean skipComponent)
-    {
-        File file = new File(filename).getAbsoluteFile();
-        return fromFilename(file.getParentFile(), file.getName(), skipComponent).left;
-    }
-
-    public static Pair<Descriptor, String> fromFilename(File directory, String name)
+    /**
+     * Parse a sstable filename into a Descriptor.
+     * <p>
+     * SSTables files are all located within subdirectories of the form {@code <keyspace>/<table>/}. Normal sstables are
+     * are directly within that subdirectory structure while 2ndary index, backups and snapshot are each inside an
+     * additional subdirectory. The file themselves have the form:
+     *   {@code <version>-<gen>-<format>-<component>}.
+     * <p>
+     * Note that this method will only sucessfully parse sstable files of supported versions.
+     *
+     * @param file the {@code File} object for the filename to parse.
+     * @return the descriptor for the parsed file.
+     *
+     * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+     * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+     * versions.
+     */
+    public static Descriptor fromFilename(File file)
     {
-        return fromFilename(directory, name, false);
+        return fromFilenameWithComponent(file).left;
     }
 
     /**
-     * Filename of the form is vary by version:
+     * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
      *
-     * <ul>
-     *     <li>&lt;ksname&gt;-&lt;cfname&gt;-(tmp-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 2.0 and before</li>
-     *     <li>(&lt;tmp marker&gt;-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 3.0 and later</li>
-     * </ul>
+     * @param file the {@code File} object for the filename to parse.
+     * @return a pair of the descriptor and component corresponding to the provided {@code file}.
      *
-     * If this is for SSTable of secondary index, directory should ends with index name for 2.1+.
-     *
-     * @param directory The directory of the SSTable files
-     * @param name The name of the SSTable file
-     * @param skipComponent true if the name param should not be parsed for a component tag
-     *
-     * @return A Descriptor for the SSTable, and the Component remainder.
+     * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+     * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+     * versions.
      */
-    public static Pair<Descriptor, String> fromFilename(File directory, String name, boolean skipComponent)
+    public static Pair<Descriptor, Component> fromFilenameWithComponent(File file)
     {
-        File parentDirectory = directory != null ? directory : new File(".");
+        // We need to extract the keyspace and table names from the parent directories, so make sure we deal with the
+        // absolute path.
+        if (!file.isAbsolute())
+            file = file.getAbsoluteFile();
 
-        // tokenize the filename
-        StringTokenizer st = new StringTokenizer(name, String.valueOf(separator));
-        String nexttok;
+        String name = file.getName();
+        List<String> tokens = filenameSplitter.splitToList(name);
+        int size = tokens.size();
 
-        // read tokens backwards to determine version
-        Deque<String> tokenStack = new ArrayDeque<>();
-        while (st.hasMoreTokens())
+        if (size != 4)
         {
-            tokenStack.push(st.nextToken());
+            // This is an invalid sstable file for this version. But to provide a more helpful error message, we detect
+            // old format sstable, which had the format:
+            //   <keyspace>-<table>-(tmp-)?<version>-<gen>-<component>
+            // Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect
+            // but we're just trying to be helpful, not perfect.
+            if (size == 5 || size == 6)
+                throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.",
+                                                                 name,
+                                                                 tokens.get(size - 3)));
+            throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name));
         }
 
-        // component suffix
-        String component = skipComponent ? null : tokenStack.pop();
+        String versionString = tokens.get(0);
+        if (!Version.validate(versionString))
+            throw invalidSSTable(name, "invalid version %s", versionString);
 
-        nexttok = tokenStack.pop();
-        // generation OR format type
-        SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
-        if (!CharMatcher.DIGIT.matchesAllOf(nexttok))
+        int generation;
+        try
         {
-            fmt = SSTableFormat.Type.validate(nexttok);
-            nexttok = tokenStack.pop();
+            generation = Integer.parseInt(tokens.get(1));
+        }
+        catch (NumberFormatException e)
+        {
+            throw invalidSSTable(name, "the 'generation' part of the name doesn't parse as a number");
         }
 
-        // generation
-        int generation = Integer.parseInt(nexttok);
+        String formatString = tokens.get(2);
+        SSTableFormat.Type format;
+        try
+        {
+            format = SSTableFormat.Type.validate(formatString);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw invalidSSTable(name, "unknown 'format' part (%s)", formatString);
+        }
 
-        // version
-        nexttok = tokenStack.pop();
+        Component component = Component.parse(tokens.get(3));
 
-        if (!Version.validate(nexttok))
-            throw new UnsupportedOperationException("SSTable " + name + " is too old to open.  Upgrade to 2.0 first, and run upgradesstables");
+        Version version = format.info.getVersion(versionString);
+        if (!version.isCompatible())
+            throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString);
 
-        Version version = fmt.info.getVersion(nexttok);
+        File directory = parentOf(name, file);
+        File tableDir = directory;
 
-        // ks/cf names
-        String ksname, cfname;
-        if (version.hasNewFileName())
+        // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
+        String indexName = "";
+        if (tableDir.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
         {
-            // for 2.1+ read ks and cf names from directory
-            File cfDirectory = parentDirectory;
-            // check if this is secondary index
-            String indexName = "";
-            if (cfDirectory.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
-            {
-                indexName = cfDirectory.getName();
-                cfDirectory = cfDirectory.getParentFile();
-            }
-            if (cfDirectory.getName().equals(Directories.BACKUPS_SUBDIR))
-            {
-                cfDirectory = cfDirectory.getParentFile();
-            }
-            else if (cfDirectory.getParentFile().getName().equals(Directories.SNAPSHOT_SUBDIR))
-            {
-                cfDirectory = cfDirectory.getParentFile().getParentFile();
-            }
-            cfname = cfDirectory.getName().split("-")[0] + indexName;
-            ksname = cfDirectory.getParentFile().getName();
+            indexName = tableDir.getName();
+            tableDir = parentOf(name, tableDir);
         }
-        else
-        {
-            cfname = tokenStack.pop();
-            ksname = tokenStack.pop();
-        }
-        assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory;
 
-        return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt,
-                                          // _assume_ version from version
-                                          Component.digestFor(version.uncompressedChecksumType())),
-                           component);
+        // Then it can be a backup or a snapshot
+        if (tableDir.getName().equals(Directories.BACKUPS_SUBDIR))
+            tableDir = tableDir.getParentFile();
+        else if (parentOf(name, tableDir).getName().equals(Directories.SNAPSHOT_SUBDIR))
+            tableDir = parentOf(name, parentOf(name, tableDir));
+
+        String table = tableDir.getName().split("-")[0] + indexName;
+        String keyspace = parentOf(name, tableDir).getName();
+
+        return Pair.create(new Descriptor(version, directory, keyspace, table, generation, format), component);
+    }
+
+    private static File parentOf(String name, File file)
+    {
+        File parent = file.getParentFile();
+        if (parent == null)
+            throw invalidSSTable(name, "cannot extract keyspace and table name; make sure the sstable is in the proper sub-directories");
+        return parent;
+    }
+
+    private static IllegalArgumentException invalidSSTable(String name, String msgFormat, Object... parameters)
+    {
+        throw new IllegalArgumentException(String.format("Invalid sstable file " + name + ": " + msgFormat, parameters));
     }
 
     public IMetadataSerializer getMetadataSerializer()
     {
-        if (version.hasNewStatsFile())
-            return new MetadataSerializer();
-        else
-            return new LegacyMetadataSerializer();
+        return new MetadataSerializer();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
index 9ee1996..03246c5 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -79,6 +82,11 @@ public class IndexInfo
         this.endOpenMarker = endOpenMarker;
     }
 
+    public static IndexInfo.Serializer serializer(Version version, SerializationHeader header)
+    {
+        return new IndexInfo.Serializer(version, header.clusteringTypes());
+    }
+
     public static class Serializer implements ISerializer<IndexInfo>
     {
         // This is the default index size that we use to delta-encode width when serializing so we get better vint-encoding.
@@ -87,21 +95,19 @@ public class IndexInfo
         // size so using the default is almost surely better than using no base at all.
         public static final long WIDTH_BASE = 64 * 1024;
 
-        private final ISerializer<ClusteringPrefix> clusteringSerializer;
-        private final Version version;
+        private final int version;
+        private final List<AbstractType<?>> clusteringTypes;
 
-        public Serializer(Version version, ISerializer<ClusteringPrefix> clusteringSerializer)
+        public Serializer(Version version, List<AbstractType<?>> clusteringTypes)
         {
-            this.clusteringSerializer = clusteringSerializer;
-            this.version = version;
+            this.version = version.correspondingMessagingVersion();
+            this.clusteringTypes = clusteringTypes;
         }
 
         public void serialize(IndexInfo info, DataOutputPlus out) throws IOException
         {
-            assert version.storeRows() : "We read old index files but we should never write them";
-
-            clusteringSerializer.serialize(info.firstName, out);
-            clusteringSerializer.serialize(info.lastName, out);
+            ClusteringPrefix.serializer.serialize(info.firstName, out, version, clusteringTypes);
+            ClusteringPrefix.serializer.serialize(info.lastName, out, version, clusteringTypes);
             out.writeUnsignedVInt(info.offset);
             out.writeVInt(info.width - WIDTH_BASE);
 
@@ -112,53 +118,33 @@ public class IndexInfo
 
         public void skip(DataInputPlus in) throws IOException
         {
-            clusteringSerializer.skip(in);
-            clusteringSerializer.skip(in);
-            if (version.storeRows())
-            {
-                in.readUnsignedVInt();
-                in.readVInt();
-                if (in.readBoolean())
-                    DeletionTime.serializer.skip(in);
-            }
-            else
-            {
-                in.skipBytes(TypeSizes.sizeof(0L));
-                in.skipBytes(TypeSizes.sizeof(0L));
-            }
+            ClusteringPrefix.serializer.skip(in, version, clusteringTypes);
+            ClusteringPrefix.serializer.skip(in, version, clusteringTypes);
+            in.readUnsignedVInt();
+            in.readVInt();
+            if (in.readBoolean())
+                DeletionTime.serializer.skip(in);
         }
 
         public IndexInfo deserialize(DataInputPlus in) throws IOException
         {
-            ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
-            ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
-            long offset;
-            long width;
+            ClusteringPrefix firstName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes);
+            ClusteringPrefix lastName = ClusteringPrefix.serializer.deserialize(in, version, clusteringTypes);
+            long offset = in.readUnsignedVInt();
+            long width = in.readVInt() + WIDTH_BASE;
             DeletionTime endOpenMarker = null;
-            if (version.storeRows())
-            {
-                offset = in.readUnsignedVInt();
-                width = in.readVInt() + WIDTH_BASE;
-                if (in.readBoolean())
-                    endOpenMarker = DeletionTime.serializer.deserialize(in);
-            }
-            else
-            {
-                offset = in.readLong();
-                width = in.readLong();
-            }
+            if (in.readBoolean())
+                endOpenMarker = DeletionTime.serializer.deserialize(in);
             return new IndexInfo(firstName, lastName, offset, width, endOpenMarker);
         }
 
         public long serializedSize(IndexInfo info)
         {
-            assert version.storeRows() : "We read old index files but we should never write them";
-
-            long size = clusteringSerializer.serializedSize(info.firstName)
-                        + clusteringSerializer.serializedSize(info.lastName)
-                        + TypeSizes.sizeofUnsignedVInt(info.offset)
-                        + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
-                        + TypeSizes.sizeof(info.endOpenMarker != null);
+            long size = ClusteringPrefix.serializer.serializedSize(info.firstName, version, clusteringTypes)
+                      + ClusteringPrefix.serializer.serializedSize(info.lastName, version, clusteringTypes)
+                      + TypeSizes.sizeofUnsignedVInt(info.offset)
+                      + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
+                      + TypeSizes.sizeof(info.endOpenMarker != null);
 
             if (info.endOpenMarker != null)
                 size += DeletionTime.serializer.serializedSize(info.endOpenMarker);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 6de3478..303adfd 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -268,16 +268,13 @@ public class IndexSummary extends WrappedSharedCloseable
 
     public static class IndexSummarySerializer
     {
-        public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
+        public void serialize(IndexSummary t, DataOutputPlus out) throws IOException
         {
             out.writeInt(t.minIndexInterval);
             out.writeInt(t.offsetCount);
             out.writeLong(t.getOffHeapSize());
-            if (withSamplingLevel)
-            {
-                out.writeInt(t.samplingLevel);
-                out.writeInt(t.sizeAtFullSampling);
-            }
+            out.writeInt(t.samplingLevel);
+            out.writeInt(t.sizeAtFullSampling);
             // our on-disk representation treats the offsets and the summary data as one contiguous structure,
             // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
             // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
@@ -297,7 +294,7 @@ public class IndexSummary extends WrappedSharedCloseable
         }
 
         @SuppressWarnings("resource")
-        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
+        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
         {
             int minIndexInterval = in.readInt();
             if (minIndexInterval != expectedMinIndexInterval)
@@ -308,17 +305,8 @@ public class IndexSummary extends WrappedSharedCloseable
 
             int offsetCount = in.readInt();
             long offheapSize = in.readLong();
-            int samplingLevel, fullSamplingSummarySize;
-            if (haveSamplingLevel)
-            {
-                samplingLevel = in.readInt();
-                fullSamplingSummarySize = in.readInt();
-            }
-            else
-            {
-                samplingLevel = BASE_SAMPLING_LEVEL;
-                fullSamplingSummarySize = offsetCount;
-            }
+            int samplingLevel = in.readInt();
+            int fullSamplingSummarySize = in.readInt();
 
             int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval);
             if (effectiveIndexInterval > maxIndexInterval)
@@ -355,13 +343,12 @@ public class IndexSummary extends WrappedSharedCloseable
          *
          * Only for use by offline tools like SSTableMetadataViewer, otherwise SSTable.first/last should be used.
          */
-        public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
+        public Pair<DecoratedKey, DecoratedKey> deserializeFirstLastKey(DataInputStream in, IPartitioner partitioner) throws IOException
         {
             in.skipBytes(4); // minIndexInterval
             int offsetCount = in.readInt();
             long offheapSize = in.readLong();
-            if (haveSamplingLevel)
-                in.skipBytes(8); // samplingLevel, fullSamplingSummarySize
+            in.skipBytes(8); // samplingLevel, fullSamplingSummarySize
 
             in.skip(offsetCount * 4);
             in.skip(offheapSize - offsetCount * 4);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 8fb4835..fc326dc 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -73,21 +73,9 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
     public List<SSTableReader> redistributeSummaries() throws IOException
     {
         logger.info("Redistributing index summaries");
-        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
         List<SSTableReader> redistribute = new ArrayList<>();
         for (LifecycleTransaction txn : transactions.values())
         {
-            for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
-            {
-                // We can't change the sampling level of sstables with the old format, because the serialization format
-                // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
-                logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
-                if (!sstable.descriptor.version.hasSamplingLevel())
-                {
-                    oldFormatSSTables.add(sstable);
-                    txn.cancel(sstable);
-                }
-            }
             redistribute.addAll(txn.originals());
         }
 
@@ -119,7 +107,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
 
         long remainingBytes = memoryPoolBytes;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
+        for (SSTableReader sstable : compacting)
             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
 
         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
@@ -130,7 +118,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
             txn.finish();
 
         total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
+        for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
             total += sstable.getIndexSummaryOffHeapSize();
         logger.trace("Completed resizing of index summaries; current approximate memory used: {}",
                      FBUtilities.prettyPrintMemory(total));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 601f5a0..8556cfa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -168,14 +168,40 @@ public abstract class SSTable
     }
 
     /**
-     * @return Descriptor and Component pair. null if given file is not acceptable as SSTable component.
-     *         If component is of unknown type, returns CUSTOM component.
+     * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object.
+     *
+     * @param file the filename to parse.
+     * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to
+     * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be
+     * returned as CUSTOM ones.
+     */
+    public static Pair<Descriptor, Component> tryComponentFromFilename(File file)
+    {
+        try
+        {
+            return Descriptor.fromFilenameWithComponent(file);
+        }
+        catch (Throwable e)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Parse a sstable filename into a {@link Descriptor} object.
+     * <p>
+     * Note that this method ignores the component part of the filename; if this is not what you want, use
+     * {@link #tryComponentFromFilename} instead.
+     *
+     * @param file the filename to parse.
+     * @return the {@code Descriptor} corresponding to {@code file} if it corresponds to a valid and supported sstable
+     * filename, {@code null} otherwise.
      */
-    public static Pair<Descriptor, Component> tryComponentFromFilename(File dir, String name)
+    public static Descriptor tryDescriptorFromFilename(File file)
     {
         try
         {
-            return Component.fromFilename(dir, name);
+            return Descriptor.fromFilename(file);
         }
         catch (Throwable e)
         {
@@ -218,17 +244,9 @@ public abstract class SSTable
         Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size());
         for (Component.Type componentType : knownTypes)
         {
-            if (componentType == Component.Type.DIGEST)
-            {
-                if (desc.digestComponent != null && new File(desc.filenameFor(desc.digestComponent)).exists())
-                    components.add(desc.digestComponent);
-            }
-            else
-            {
-                Component component = new Component(componentType);
-                if (new File(desc.filenameFor(component)).exists())
-                    components.add(component);
-            }
+            Component component = new Component(componentType);
+            if (new File(desc.filenameFor(component)).exists())
+                components.add(component);
         }
         return components;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 043f6fa..e00de4a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -85,7 +85,7 @@ public class SSTableLoader implements StreamEventHandler
                                               return false;
                                           }
 
-                                          Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
+                                          Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file);
                                           Descriptor desc = p == null ? null : p.left;
                                           if (p == null || !p.right.equals(Component.DATA))
                                               return false;