You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:49:59 UTC

[06/11] cassandra git commit: Remove pre-3.0 compatibility code for 4.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
deleted file mode 100644
index d0fc151..0000000
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ /dev/null
@@ -1,1099 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.schema;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.FieldIdentifier;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.functions.FunctionName;
-import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.functions.UDFunction;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static java.lang.String.format;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-
-/**
- * This majestic class performs migration from legacy (pre-3.0) system.schema_* schema tables to the new and glorious
- * system_schema keyspace.
- *
- * The goal is to not lose any information in the migration - including the timestamps.
- */
-@SuppressWarnings("deprecation")
-public final class LegacySchemaMigrator
-{
-    private LegacySchemaMigrator()
-    {
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(LegacySchemaMigrator.class);
-
-    static final List<CFMetaData> LegacySchemaTables =
-        ImmutableList.of(SystemKeyspace.LegacyKeyspaces,
-                         SystemKeyspace.LegacyColumnfamilies,
-                         SystemKeyspace.LegacyColumns,
-                         SystemKeyspace.LegacyTriggers,
-                         SystemKeyspace.LegacyUsertypes,
-                         SystemKeyspace.LegacyFunctions,
-                         SystemKeyspace.LegacyAggregates);
-
-    public static void migrate()
-    {
-        // read metadata from the legacy schema tables
-        Collection<Keyspace> keyspaces = readSchema();
-
-        // if already upgraded, or starting a new 3.0 node, abort early
-        if (keyspaces.isEmpty())
-        {
-            unloadLegacySchemaTables();
-            return;
-        }
-
-        // write metadata to the new schema tables
-        logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
-                    keyspaces.size(),
-                    SchemaConstants.SCHEMA_KEYSPACE_NAME);
-        keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables);
-        keyspaces.forEach(LegacySchemaMigrator::migrateBuiltIndexesForKeyspace);
-
-        // flush the new tables before truncating the old ones
-        SchemaKeyspace.flush();
-
-        // truncate the original tables (will be snapshotted now, and will have been snapshotted by pre-flight checks)
-        logger.info("Truncating legacy schema tables");
-        truncateLegacySchemaTables();
-
-        // remove legacy schema tables from Schema, so that their presence doesn't give the users any wrong ideas
-        unloadLegacySchemaTables();
-
-        logger.info("Completed migration of legacy schema tables");
-    }
-
-    private static void migrateBuiltIndexesForKeyspace(Keyspace keyspace)
-    {
-        keyspace.tables.forEach(LegacySchemaMigrator::migrateBuiltIndexesForTable);
-    }
-
-    private static void migrateBuiltIndexesForTable(Table table)
-    {
-        table.metadata.getIndexes().forEach((index) -> migrateIndexBuildStatus(table.metadata.ksName,
-                                                                               table.metadata.cfName,
-                                                                               index));
-    }
-
-    private static void migrateIndexBuildStatus(String keyspace, String table, IndexMetadata index)
-    {
-        if (SystemKeyspace.isIndexBuilt(keyspace, table + '.' + index.name))
-        {
-            SystemKeyspace.setIndexBuilt(keyspace, index.name);
-            SystemKeyspace.setIndexRemoved(keyspace, table + '.' + index.name);
-        }
-    }
-
-    static void unloadLegacySchemaTables()
-    {
-        KeyspaceMetadata systemKeyspace = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-
-        Tables systemTables = systemKeyspace.tables;
-        for (CFMetaData table : LegacySchemaTables)
-            systemTables = systemTables.without(table.cfName);
-
-        LegacySchemaTables.forEach(Schema.instance::unload);
-        LegacySchemaTables.forEach((cfm) -> org.apache.cassandra.db.Keyspace.openAndGetStore(cfm).invalidate());
-
-        Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
-    }
-
-    private static void truncateLegacySchemaTables()
-    {
-        LegacySchemaTables.forEach(table -> Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking());
-    }
-
-    private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
-    {
-        logger.info("Migrating keyspace {}", keyspace);
-
-        Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp);
-        for (Table table : keyspace.tables)
-            SchemaKeyspace.addTableToSchemaMutation(table.metadata, true, builder.timestamp(table.timestamp));
-
-        for (Type type : keyspace.types)
-            SchemaKeyspace.addTypeToSchemaMutation(type.metadata, builder.timestamp(type.timestamp));
-
-        for (Function function : keyspace.functions)
-            SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, builder.timestamp(function.timestamp));
-
-        for (Aggregate aggregate : keyspace.aggregates)
-            SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, builder.timestamp(aggregate.timestamp));
-
-        builder.build().apply();
-    }
-
-    /*
-     * Read all keyspaces metadata (including nested tables, types, and functions), with their modification timestamps
-     */
-    private static Collection<Keyspace> readSchema()
-    {
-        String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_KEYSPACES);
-        Collection<String> keyspaceNames = new ArrayList<>();
-        query(query).forEach(row -> keyspaceNames.add(row.getString("keyspace_name")));
-        keyspaceNames.removeAll(SchemaConstants.SYSTEM_KEYSPACE_NAMES);
-
-        Collection<Keyspace> keyspaces = new ArrayList<>();
-        keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name)));
-        return keyspaces;
-    }
-
-    private static Keyspace readKeyspace(String keyspaceName)
-    {
-        long timestamp = readKeyspaceTimestamp(keyspaceName);
-        KeyspaceParams params = readKeyspaceParams(keyspaceName);
-
-        Collection<Table> tables = readTables(keyspaceName);
-        Collection<Type> types = readTypes(keyspaceName);
-        Collection<Function> functions = readFunctions(keyspaceName);
-        Functions.Builder functionsBuilder = Functions.builder();
-        functions.forEach(udf -> functionsBuilder.add(udf.metadata));
-        Collection<Aggregate> aggregates = readAggregates(functionsBuilder.build(), keyspaceName);
-
-        return new Keyspace(timestamp, keyspaceName, params, tables, types, functions, aggregates);
-    }
-
-    /*
-     * Reading keyspace params
-     */
-
-    private static long readKeyspaceTimestamp(String keyspaceName)
-    {
-        String query = format("SELECT writeTime(durable_writes) AS timestamp FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_KEYSPACES);
-        return query(query, keyspaceName).one().getLong("timestamp");
-    }
-
-    private static KeyspaceParams readKeyspaceParams(String keyspaceName)
-    {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_KEYSPACES);
-        UntypedResultSet.Row row = query(query, keyspaceName).one();
-
-        boolean durableWrites = row.getBoolean("durable_writes");
-
-        Map<String, String> replication = new HashMap<>();
-        replication.putAll(fromJsonMap(row.getString("strategy_options")));
-        replication.put(ReplicationParams.CLASS, row.getString("strategy_class"));
-
-        return KeyspaceParams.create(durableWrites, replication);
-    }
-
-    /*
-     * Reading tables
-     */
-
-    private static Collection<Table> readTables(String keyspaceName)
-    {
-        String query = format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
-        Collection<String> tableNames = new ArrayList<>();
-        query(query, keyspaceName).forEach(row -> tableNames.add(row.getString("columnfamily_name")));
-
-        Collection<Table> tables = new ArrayList<>();
-        tableNames.forEach(name -> tables.add(readTable(keyspaceName, name)));
-        return tables;
-    }
-
-    private static Table readTable(String keyspaceName, String tableName)
-    {
-        long timestamp = readTableTimestamp(keyspaceName, tableName);
-        CFMetaData metadata = readTableMetadata(keyspaceName, tableName);
-        return new Table(timestamp, metadata);
-    }
-
-    private static long readTableTimestamp(String keyspaceName, String tableName)
-    {
-        String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
-        return query(query, keyspaceName, tableName).one().getLong("timestamp");
-    }
-
-    private static CFMetaData readTableMetadata(String keyspaceName, String tableName)
-    {
-        String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
-                                   SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                                   SystemKeyspace.LEGACY_COLUMNFAMILIES);
-        UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, tableName).one();
-
-        String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
-                                     SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                                     SystemKeyspace.LEGACY_COLUMNS);
-        UntypedResultSet columnRows = query(columnsQuery, keyspaceName, tableName);
-
-        String triggersQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
-                                      SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                                      SystemKeyspace.LEGACY_TRIGGERS);
-        UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, tableName);
-
-        return decodeTableMetadata(tableRow, columnRows, triggerRows);
-    }
-
-    private static CFMetaData decodeTableMetadata(UntypedResultSet.Row tableRow,
-                                                  UntypedResultSet columnRows,
-                                                  UntypedResultSet triggerRows)
-    {
-        String ksName = tableRow.getString("keyspace_name");
-        String cfName = tableRow.getString("columnfamily_name");
-
-        AbstractType<?> rawComparator = TypeParser.parse(tableRow.getString("comparator"));
-        AbstractType<?> subComparator = tableRow.has("subcomparator") ? TypeParser.parse(tableRow.getString("subcomparator")) : null;
-
-        boolean isSuper = "super".equals(tableRow.getString("type").toLowerCase(Locale.ENGLISH));
-        boolean isCompound = rawComparator instanceof CompositeType || isSuper;
-
-        /*
-         * Determine whether or not the table is *really* dense
-         * We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
-         * but we can trust is_dense value of false.
-         */
-        Boolean rawIsDense = tableRow.has("is_dense") ? tableRow.getBoolean("is_dense") : null;
-        boolean isDense;
-        if (rawIsDense != null && !rawIsDense)
-            isDense = false;
-        else
-            isDense = calculateIsDense(rawComparator, columnRows);
-
-        // now, if switched to sparse, remove redundant compact_value column and the last clustering column,
-        // directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
-        Iterable<UntypedResultSet.Row> filteredColumnRows = !isDense && (rawIsDense == null || rawIsDense)
-                                                          ? filterOutRedundantRowsForSparse(columnRows, isSuper, isCompound)
-                                                          : columnRows;
-
-        // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table
-        AbstractType<?> defaultValidator = TypeParser.parse(tableRow.getString("default_validator"));
-        boolean isCounter = defaultValidator instanceof CounterColumnType;
-
-        /*
-         * With CASSANDRA-5202 we stopped inferring the cf id from the combination of keyspace/table names,
-         * and started storing the generated uuids in system.schema_columnfamilies.
-         *
-         * In 3.0 we SHOULD NOT see tables like that (2.0-created, non-upgraded).
-         * But in the off-chance that we do, we generate the deterministic uuid here.
-         */
-        UUID cfId = tableRow.has("cf_id")
-                  ? tableRow.getUUID("cf_id")
-                  : CFMetaData.generateLegacyCfId(ksName, cfName);
-
-        boolean isCQLTable = !isSuper && !isDense && isCompound;
-        boolean isStaticCompactTable = !isDense && !isCompound;
-
-        // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from
-        // previous versions, they may not have the expected schema, so detect if we need to upgrade and do
-        // it in createColumnsFromColumnRows.
-        // We can remove this once we don't support upgrade from versions < 3.0.
-        boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(filteredColumnRows, isSuper, isStaticCompactTable);
-
-        List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(filteredColumnRows,
-                                                                        ksName,
-                                                                        cfName,
-                                                                        rawComparator,
-                                                                        subComparator,
-                                                                        isSuper,
-                                                                        isCQLTable,
-                                                                        isStaticCompactTable,
-                                                                        needsUpgrade);
-
-        if (needsUpgrade)
-        {
-            addDefinitionForUpgrade(columnDefs,
-                                    ksName,
-                                    cfName,
-                                    isStaticCompactTable,
-                                    isSuper,
-                                    rawComparator,
-                                    subComparator,
-                                    defaultValidator);
-        }
-
-        CFMetaData cfm = CFMetaData.create(ksName,
-                                           cfName,
-                                           cfId,
-                                           isDense,
-                                           isCompound,
-                                           isSuper,
-                                           isCounter,
-                                           false, // legacy schema did not contain views
-                                           columnDefs,
-                                           DatabaseDescriptor.getPartitioner());
-
-        Indexes indexes = createIndexesFromColumnRows(cfm,
-                                                      filteredColumnRows,
-                                                      ksName,
-                                                      cfName,
-                                                      rawComparator,
-                                                      subComparator,
-                                                      isSuper,
-                                                      isCQLTable,
-                                                      isStaticCompactTable,
-                                                      needsUpgrade);
-        cfm.indexes(indexes);
-
-        if (tableRow.has("dropped_columns"))
-            addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance));
-
-        return cfm.params(decodeTableParams(tableRow))
-                  .triggers(createTriggersFromTriggerRows(triggerRows));
-    }
-
-    /*
-     * We call dense a CF for which each component of the comparator is a clustering column, i.e. no
-     * component is used to store a regular column names. In other words, non-composite static "thrift"
-     * and CQL3 CF are *not* dense.
-     * We save whether the table is dense or not during table creation through CQL, but we don't have this
-     * information for table just created through thrift, nor for table prior to CASSANDRA-7744, so this
-     * method does its best to infer whether the table is dense or not based on other elements.
-     */
-    private static boolean calculateIsDense(AbstractType<?> comparator, UntypedResultSet columnRows)
-    {
-        /*
-         * As said above, this method is only here because we need to deal with thrift upgrades.
-         * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
-         * then we'll have saved the "is_dense" value and will be good to go.
-         *
-         * But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
-         * to infer that information without relying on it in that case. And for the most part this is
-         * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
-         * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
-         * PRIMARY KEY defined.
-         *
-         * So we need to recognize those special case CQL3 table with only a primary key. If we have some
-         * clustering columns, we're fine as said above. So the only problem is that we cannot decide for
-         * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
-         * has been created in CQL3 by say:
-         *    CREATE TABLE test (k int PRIMARY KEY)
-         * in which case it should not be dense. However, we can limit our margin of error by assuming we are
-         * in the latter case only if the comparator is exactly CompositeType(UTF8Type).
-         */
-        for (UntypedResultSet.Row columnRow : columnRows)
-            if ("regular".equals(columnRow.getString("type")))
-                return false;
-
-        int maxClusteringIdx = -1;
-        for (UntypedResultSet.Row columnRow : columnRows)
-            if ("clustering_key".equals(columnRow.getString("type")))
-                maxClusteringIdx = Math.max(maxClusteringIdx, columnRow.has("component_index") ? columnRow.getInt("component_index") : 0);
-
-        return maxClusteringIdx >= 0
-             ? maxClusteringIdx == comparator.componentsCount() - 1
-             : !isCQL3OnlyPKComparator(comparator);
-    }
-
-    private static Iterable<UntypedResultSet.Row> filterOutRedundantRowsForSparse(UntypedResultSet columnRows, boolean isSuper, boolean isCompound)
-    {
-        Collection<UntypedResultSet.Row> filteredRows = new ArrayList<>();
-        for (UntypedResultSet.Row columnRow : columnRows)
-        {
-            String kind = columnRow.getString("type");
-
-            if ("compact_value".equals(kind))
-                continue;
-
-            if ("clustering_key".equals(kind))
-            {
-                int position = columnRow.has("component_index") ? columnRow.getInt("component_index") : 0;
-                if (isSuper && position != 0)
-                    continue;
-
-                if (!isSuper && !isCompound)
-                    continue;
-            }
-
-            filteredRows.add(columnRow);
-        }
-
-        return filteredRows;
-    }
-
-    private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
-    {
-        if (!(comparator instanceof CompositeType))
-            return false;
-
-        CompositeType ct = (CompositeType)comparator;
-        return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type;
-    }
-
-    private static TableParams decodeTableParams(UntypedResultSet.Row row)
-    {
-        TableParams.Builder params = TableParams.builder();
-
-        params.readRepairChance(row.getDouble("read_repair_chance"))
-              .dcLocalReadRepairChance(row.getDouble("local_read_repair_chance"))
-              .gcGraceSeconds(row.getInt("gc_grace_seconds"));
-
-        if (row.has("comment"))
-            params.comment(row.getString("comment"));
-
-        if (row.has("memtable_flush_period_in_ms"))
-            params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"));
-
-        params.caching(CachingParams.fromMap(fromJsonMap(row.getString("caching"))));
-
-        if (row.has("default_time_to_live"))
-            params.defaultTimeToLive(row.getInt("default_time_to_live"));
-
-        if (row.has("speculative_retry"))
-            params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
-
-        Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters"));
-        String crcCheckChance = compressionParameters.remove("crc_check_chance");
-        //crc_check_chance was promoted from a compression property to a top-level property
-        if (crcCheckChance != null)
-            params.crcCheckChance(Double.parseDouble(crcCheckChance));
-
-        params.compression(CompressionParams.fromMap(compressionParameters));
-
-        params.compaction(compactionFromRow(row));
-
-        if (row.has("min_index_interval"))
-            params.minIndexInterval(row.getInt("min_index_interval"));
-
-        if (row.has("max_index_interval"))
-            params.maxIndexInterval(row.getInt("max_index_interval"));
-
-        if (row.has("bloom_filter_fp_chance"))
-            params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"));
-
-        return params.build();
-    }
-
-    /*
-     * The method is needed - to migrate max_compaction_threshold and min_compaction_threshold
-     * to the compaction map, where they belong.
-     *
-     * We must use reflection to validate the options because not every compaction strategy respects and supports
-     * the threshold params (LCS doesn't, STCS and DTCS do).
-     */
-    @SuppressWarnings("unchecked")
-    private static CompactionParams compactionFromRow(UntypedResultSet.Row row)
-    {
-        Class<? extends AbstractCompactionStrategy> klass =
-            CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class"));
-        Map<String, String> options = fromJsonMap(row.getString("compaction_strategy_options"));
-
-        int minThreshold = row.getInt("min_compaction_threshold");
-        int maxThreshold = row.getInt("max_compaction_threshold");
-
-        Map<String, String> optionsWithThresholds = new HashMap<>(options);
-        optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold));
-        optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold));
-
-        try
-        {
-            Map<String, String> unrecognizedOptions =
-                (Map<String, String>) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds);
-
-            if (unrecognizedOptions.isEmpty())
-                options = optionsWithThresholds;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        return CompactionParams.create(klass, options);
-    }
-
-    // Should only be called on compact tables
-    private static boolean checkNeedsUpgrade(Iterable<UntypedResultSet.Row> defs, boolean isSuper, boolean isStaticCompactTable)
-    {
-        if (isSuper)
-        {
-            // Check if we've added the "supercolumn map" column yet or not
-            for (UntypedResultSet.Row row : defs)
-                if (row.getString("column_name").isEmpty())
-                    return false;
-            return true;
-        }
-
-        // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet,
-        // i.e. if we don't have a static definition yet.
-        if (isStaticCompactTable)
-            return !hasKind(defs, ColumnDefinition.Kind.STATIC);
-
-        // For dense compact tables, we need to upgrade if we don't have a compact value definition
-        return !hasRegularColumns(defs);
-    }
-
-    private static boolean hasRegularColumns(Iterable<UntypedResultSet.Row> columnRows)
-    {
-        for (UntypedResultSet.Row row : columnRows)
-        {
-            /*
-             * We need to special case and ignore the empty compact column (pre-3.0, COMPACT STORAGE, primary-key only tables),
-             * since deserializeKind() will otherwise just return a REGULAR.
-             * We want the proper EmptyType regular column to be added by addDefinitionForUpgrade(), so we need
-             * checkNeedsUpgrade() to return true in this case.
-             * See CASSANDRA-9874.
-             */
-            if (isEmptyCompactValueColumn(row))
-                return false;
-
-            if (deserializeKind(row.getString("type")) == ColumnDefinition.Kind.REGULAR)
-                return true;
-        }
-
-        return false;
-    }
-
-    private static boolean isEmptyCompactValueColumn(UntypedResultSet.Row row)
-    {
-        return "compact_value".equals(row.getString("type")) && row.getString("column_name").isEmpty();
-    }
-
-    private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
-                                                String ksName,
-                                                String cfName,
-                                                boolean isStaticCompactTable,
-                                                boolean isSuper,
-                                                AbstractType<?> rawComparator,
-                                                AbstractType<?> subComparator,
-                                                AbstractType<?> defaultValidator)
-    {
-        CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
-
-        if (isSuper)
-        {
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true)));
-        }
-        else if (isStaticCompactTable)
-        {
-            defs.add(ColumnDefinition.clusteringDef(ksName, cfName, names.defaultClusteringName(), rawComparator, 0));
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator));
-        }
-        else
-        {
-            // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it
-            // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too))
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance));
-        }
-    }
-
-    private static boolean hasKind(Iterable<UntypedResultSet.Row> defs, ColumnDefinition.Kind kind)
-    {
-        for (UntypedResultSet.Row row : defs)
-            if (deserializeKind(row.getString("type")) == kind)
-                return true;
-
-        return false;
-    }
-
-    /*
-     * Prior to 3.0 we used to not store the type of the dropped columns, relying on all collection info being
-     * present in the comparator, forever. That allowed us to perform certain validations in AlterTableStatement
-     * (namely not allowing to re-add incompatible collection columns, with the same name, but a different type).
-     *
-     * In 3.0, we no longer preserve the original comparator, and reconstruct it from the columns instead. That means
-     * that we should preserve the type of the dropped columns now, and, during migration, fetch the types from
-     * the original comparator if necessary.
-     */
-    private static void addDroppedColumns(CFMetaData cfm, AbstractType<?> comparator, Map<String, Long> droppedTimes)
-    {
-        AbstractType<?> last = comparator.getComponents().get(comparator.componentsCount() - 1);
-        Map<ByteBuffer, CollectionType> collections = last instanceof ColumnToCollectionType
-                                                    ? ((ColumnToCollectionType) last).defined
-                                                    : Collections.emptyMap();
-
-        for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
-        {
-            String name = entry.getKey();
-            ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
-            long time = entry.getValue();
-
-            AbstractType<?> type = collections.containsKey(nameBytes)
-                                 ? collections.get(nameBytes)
-                                 : BytesType.instance;
-
-            cfm.getDroppedColumns().put(nameBytes, new CFMetaData.DroppedColumn(name, type, time, ColumnDefinition.Kind.REGULAR));
-        }
-    }
-
-    private static List<ColumnDefinition> createColumnsFromColumnRows(Iterable<UntypedResultSet.Row> rows,
-                                                                      String keyspace,
-                                                                      String table,
-                                                                      AbstractType<?> rawComparator,
-                                                                      AbstractType<?> rawSubComparator,
-                                                                      boolean isSuper,
-                                                                      boolean isCQLTable,
-                                                                      boolean isStaticCompactTable,
-                                                                      boolean needsUpgrade)
-    {
-        List<ColumnDefinition> columns = new ArrayList<>();
-
-        for (UntypedResultSet.Row row : rows)
-        {
-            // Skip the empty compact value column. Make addDefinitionForUpgrade() re-add the proper REGULAR one.
-            if (isEmptyCompactValueColumn(row))
-                continue;
-
-            columns.add(createColumnFromColumnRow(row,
-                                                  keyspace,
-                                                  table,
-                                                  rawComparator,
-                                                  rawSubComparator,
-                                                  isSuper,
-                                                  isCQLTable,
-                                                  isStaticCompactTable,
-                                                  needsUpgrade));
-        }
-
-        return columns;
-    }
-
-    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
-                                                              String keyspace,
-                                                              String table,
-                                                              AbstractType<?> rawComparator,
-                                                              AbstractType<?> rawSubComparator,
-                                                              boolean isSuper,
-                                                              boolean isCQLTable,
-                                                              boolean isStaticCompactTable,
-                                                              boolean needsUpgrade)
-    {
-        String rawKind = row.getString("type");
-
-        ColumnDefinition.Kind kind = deserializeKind(rawKind);
-        if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
-            kind = ColumnDefinition.Kind.STATIC;
-
-        int componentIndex = ColumnDefinition.NO_POSITION;
-        // Note that the component_index is not useful for non-primary key parts (it never really in fact since there is
-        // no particular ordering of non-PK columns, we only used to use it as a simplification but that's not needed
-        // anymore)
-        if (kind.isPrimaryKeyKind())
-            // We use to not have a component index when there was a single partition key, we don't anymore (#10491)
-            componentIndex = row.has("component_index") ? row.getInt("component_index") : 0;
-
-        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
-        // we need to use the comparator fromString method
-        AbstractType<?> comparator = isCQLTable
-                                     ? UTF8Type.instance
-                                     : CompactTables.columnDefinitionComparator(rawKind, isSuper, rawComparator, rawSubComparator);
-        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
-
-        AbstractType<?> validator = parseType(row.getString("validator"));
-
-        // In the 2.x schema we didn't store UDT's with a FrozenType wrapper because they were implicitly frozen.  After
-        // CASSANDRA-7423 (non-frozen UDTs), this is no longer true, so we need to freeze UDTs and nested freezable
-        // types (UDTs and collections) to properly migrate the schema.  See CASSANDRA-11609 and CASSANDRA-11613.
-        if (validator.isUDT() && validator.isMultiCell())
-            validator = validator.freeze();
-        else
-            validator = validator.freezeNestedMulticellTypes();
-
-        return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
-    }
-
-    private static Indexes createIndexesFromColumnRows(CFMetaData cfm,
-                                                       Iterable<UntypedResultSet.Row> rows,
-                                                       String keyspace,
-                                                       String table,
-                                                       AbstractType<?> rawComparator,
-                                                       AbstractType<?> rawSubComparator,
-                                                       boolean isSuper,
-                                                       boolean isCQLTable,
-                                                       boolean isStaticCompactTable,
-                                                       boolean needsUpgrade)
-    {
-        Indexes.Builder indexes = Indexes.builder();
-
-        for (UntypedResultSet.Row row : rows)
-        {
-            IndexMetadata.Kind kind = null;
-            if (row.has("index_type"))
-                kind = IndexMetadata.Kind.valueOf(row.getString("index_type"));
-
-            if (kind == null)
-                continue;
-
-            Map<String, String> indexOptions = null;
-            if (row.has("index_options"))
-                indexOptions = fromJsonMap(row.getString("index_options"));
-
-            if (row.has("index_name"))
-            {
-                String indexName = row.getString("index_name");
-
-                ColumnDefinition column = createColumnFromColumnRow(row,
-                                                                    keyspace,
-                                                                    table,
-                                                                    rawComparator,
-                                                                    rawSubComparator,
-                                                                    isSuper,
-                                                                    isCQLTable,
-                                                                    isStaticCompactTable,
-                                                                    needsUpgrade);
-
-                indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
-            }
-            else
-            {
-                logger.error("Failed to find index name for legacy migration of index on {}.{}", keyspace, table);
-            }
-        }
-
-        return indexes.build();
-    }
-
-    private static ColumnDefinition.Kind deserializeKind(String kind)
-    {
-        if ("clustering_key".equalsIgnoreCase(kind))
-            return ColumnDefinition.Kind.CLUSTERING;
-
-        if ("compact_value".equalsIgnoreCase(kind))
-            return ColumnDefinition.Kind.REGULAR;
-
-        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
-    }
-
-    private static Triggers createTriggersFromTriggerRows(UntypedResultSet rows)
-    {
-        Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
-        rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
-        return triggers.build();
-    }
-
-    private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("trigger_name");
-        String classOption = row.getTextMap("trigger_options").get("class");
-        return new TriggerMetadata(name, classOption);
-    }
-
-    /*
-     * Reading user types
-     */
-
-    private static Collection<Type> readTypes(String keyspaceName)
-    {
-        String query = format("SELECT type_name FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_USERTYPES);
-        Collection<String> typeNames = new ArrayList<>();
-        query(query, keyspaceName).forEach(row -> typeNames.add(row.getString("type_name")));
-
-        Collection<Type> types = new ArrayList<>();
-        typeNames.forEach(name -> types.add(readType(keyspaceName, name)));
-        return types;
-    }
-
-    private static Type readType(String keyspaceName, String typeName)
-    {
-        long timestamp = readTypeTimestamp(keyspaceName, typeName);
-        UserType metadata = readTypeMetadata(keyspaceName, typeName);
-        return new Type(timestamp, metadata);
-    }
-
-    /*
-     * Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
-     * use the writeTime() CQL function, and must resort to a lower level.
-     */
-    private static long readTypeTimestamp(String keyspaceName, String typeName)
-    {
-        ColumnFamilyStore store = org.apache.cassandra.db.Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME)
-                                                                  .getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES);
-
-        ClusteringComparator comparator = store.metadata.comparator;
-        Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
-        int nowInSec = FBUtilities.nowInSeconds();
-        DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName));
-        SinglePartitionReadCommand command = SinglePartitionReadCommand.create(store.metadata, nowInSec, key, slices);
-
-        try (ReadExecutionController controller = command.executionController();
-             RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, controller), nowInSec))
-        {
-            return partition.next().primaryKeyLivenessInfo().timestamp();
-        }
-    }
-
-    private static UserType readTypeMetadata(String keyspaceName, String typeName)
-    {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND type_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_USERTYPES);
-        UntypedResultSet.Row row = query(query, keyspaceName, typeName).one();
-
-        List<FieldIdentifier> names =
-            row.getList("field_names", UTF8Type.instance)
-               .stream()
-               .map(t -> FieldIdentifier.forInternalString(t))
-               .collect(Collectors.toList());
-
-        List<AbstractType<?>> types =
-            row.getList("field_types", UTF8Type.instance)
-               .stream()
-               .map(LegacySchemaMigrator::parseType)
-               .collect(Collectors.toList());
-
-        return new UserType(keyspaceName, bytes(typeName), names, types, true);
-    }
-
-    /*
-     * Reading UDFs
-     */
-
-    private static Collection<Function> readFunctions(String keyspaceName)
-    {
-        String query = format("SELECT function_name, signature FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_FUNCTIONS);
-        HashMultimap<String, List<String>> functionSignatures = HashMultimap.create();
-        query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance)));
-
-        Collection<Function> functions = new ArrayList<>();
-        functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue())));
-        return functions;
-    }
-
-    private static Function readFunction(String keyspaceName, String functionName, List<String> signature)
-    {
-        long timestamp = readFunctionTimestamp(keyspaceName, functionName, signature);
-        UDFunction metadata = readFunctionMetadata(keyspaceName, functionName, signature);
-        return new Function(timestamp, metadata);
-    }
-
-    private static long readFunctionTimestamp(String keyspaceName, String functionName, List<String> signature)
-    {
-        String query = format("SELECT writeTime(return_type) AS timestamp " +
-                              "FROM %s.%s " +
-                              "WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_FUNCTIONS);
-        return query(query, keyspaceName, functionName, signature).one().getLong("timestamp");
-    }
-
-    private static UDFunction readFunctionMetadata(String keyspaceName, String functionName, List<String> signature)
-    {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_FUNCTIONS);
-        UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
-
-        FunctionName name = new FunctionName(keyspaceName, functionName);
-
-        List<ColumnIdentifier> argNames = new ArrayList<>();
-        if (row.has("argument_names"))
-            for (String arg : row.getList("argument_names", UTF8Type.instance))
-                argNames.add(new ColumnIdentifier(arg, true));
-
-        List<AbstractType<?>> argTypes = new ArrayList<>();
-        if (row.has("argument_types"))
-            for (String type : row.getList("argument_types", UTF8Type.instance))
-                argTypes.add(parseType(type));
-
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
-
-        String language = row.getString("language");
-        String body = row.getString("body");
-        boolean calledOnNullInput = row.getBoolean("called_on_null_input");
-
-        try
-        {
-            return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body);
-        }
-        catch (InvalidRequestException e)
-        {
-            return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e);
-        }
-    }
-
-    /*
-     * Reading UDAs
-     */
-
-    private static Collection<Aggregate> readAggregates(Functions functions, String keyspaceName)
-    {
-        String query = format("SELECT aggregate_name, signature FROM %s.%s WHERE keyspace_name = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_AGGREGATES);
-        HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create();
-        query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance)));
-
-        Collection<Aggregate> aggregates = new ArrayList<>();
-        aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(functions, keyspaceName, pair.getKey(), pair.getValue())));
-        return aggregates;
-    }
-
-    private static Aggregate readAggregate(Functions functions, String keyspaceName, String aggregateName, List<String> signature)
-    {
-        long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, signature);
-        UDAggregate metadata = readAggregateMetadata(functions, keyspaceName, aggregateName, signature);
-        return new Aggregate(timestamp, metadata);
-    }
-
-    private static long readAggregateTimestamp(String keyspaceName, String aggregateName, List<String> signature)
-    {
-        String query = format("SELECT writeTime(return_type) AS timestamp " +
-                              "FROM %s.%s " +
-                              "WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_AGGREGATES);
-        return query(query, keyspaceName, aggregateName, signature).one().getLong("timestamp");
-    }
-
-    private static UDAggregate readAggregateMetadata(Functions functions, String keyspaceName, String functionName, List<String> signature)
-    {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
-                              SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                              SystemKeyspace.LEGACY_AGGREGATES);
-        UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
-
-        FunctionName name = new FunctionName(keyspaceName, functionName);
-
-        List<String> types = row.getList("argument_types", UTF8Type.instance);
-
-        List<AbstractType<?>> argTypes = new ArrayList<>();
-        if (types != null)
-        {
-            argTypes = new ArrayList<>(types.size());
-            for (String type : types)
-                argTypes.add(parseType(type));
-        }
-
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
-
-        FunctionName stateFunc = new FunctionName(keyspaceName, row.getString("state_func"));
-        AbstractType<?> stateType = parseType(row.getString("state_type"));
-        FunctionName finalFunc = row.has("final_func") ? new FunctionName(keyspaceName, row.getString("final_func")) : null;
-        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
-
-        try
-        {
-            return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
-        }
-        catch (InvalidRequestException reason)
-        {
-            return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
-        }
-    }
-
-    private static UntypedResultSet query(String query, Object... values)
-    {
-        return QueryProcessor.executeOnceInternal(query, values);
-    }
-
-    private static AbstractType<?> parseType(String str)
-    {
-        return TypeParser.parse(str);
-    }
-
-    private static final class Keyspace
-    {
-        final long timestamp;
-        final String name;
-        final KeyspaceParams params;
-        final Collection<Table> tables;
-        final Collection<Type> types;
-        final Collection<Function> functions;
-        final Collection<Aggregate> aggregates;
-
-        Keyspace(long timestamp,
-                 String name,
-                 KeyspaceParams params,
-                 Collection<Table> tables,
-                 Collection<Type> types,
-                 Collection<Function> functions,
-                 Collection<Aggregate> aggregates)
-        {
-            this.timestamp = timestamp;
-            this.name = name;
-            this.params = params;
-            this.tables = tables;
-            this.types = types;
-            this.functions = functions;
-            this.aggregates = aggregates;
-        }
-    }
-
-    private static final class Table
-    {
-        final long timestamp;
-        final CFMetaData metadata;
-
-        Table(long timestamp, CFMetaData metadata)
-        {
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-    }
-
-    private static final class Type
-    {
-        final long timestamp;
-        final UserType metadata;
-
-        Type(long timestamp, UserType metadata)
-        {
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-    }
-
-    private static final class Function
-    {
-        final long timestamp;
-        final UDFunction metadata;
-
-        Function(long timestamp, UDFunction metadata)
-        {
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-    }
-
-    private static final class Aggregate
-    {
-        final long timestamp;
-        final UDAggregate metadata;
-
-        Aggregate(long timestamp, UDAggregate metadata)
-        {
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7aa926e..8944b7c 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -106,7 +106,7 @@ public abstract class AbstractReadExecutor
             if (traceState != null)
                 traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
             logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
-            MessageOut<ReadCommand> message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint));
+            MessageOut<ReadCommand> message = readCommand.createMessage();
             MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
         }
 
@@ -291,8 +291,7 @@ public abstract class AbstractReadExecutor
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
-                int version = MessagingService.instance().getVersion(extraReplica);
-                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version), extraReplica, handler);
+                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
                 speculated = true;
 
                 cfs.metric.speculativeRetries.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index a4e18c0..54fa7e2 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -454,10 +454,6 @@ public class CacheService implements CacheServiceMBean
     {
         public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
         {
-            //Don't serialize old format entries since we didn't bother to implement serialization of both for simplicity
-            //https://issues.apache.org/jira/browse/CASSANDRA-10778
-            if (!key.desc.version.storeRows()) return;
-
             RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key);
             if (entry == null)
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..b41cc00 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,7 +46,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -59,14 +58,12 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.hints.LegacyHintsMigrator;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.schema.LegacySchemaMigrator;
 import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -205,18 +202,6 @@ public class CassandraDaemon
             exitOrFail(e.returnCode, e.getMessage(), e.getCause());
         }
 
-        try
-        {
-            if (SystemKeyspace.snapshotOnVersionChange())
-            {
-                SystemKeyspace.migrateDataDirs();
-            }
-        }
-        catch (IOException e)
-        {
-            exitOrFail(3, e.getMessage(), e.getCause());
-        }
-
         // We need to persist this as soon as possible after startup checks.
         // This should be the first write to SystemKeyspace (CASSANDRA-11742)
         SystemKeyspace.persistLocalMetadata();
@@ -249,13 +234,6 @@ public class CassandraDaemon
             }
         });
 
-        /*
-         * Migrate pre-3.0 keyspaces, tables, types, functions, and aggregates, to their new 3.0 storage.
-         * We don't (and can't) wait for commit log replay here, but we don't need to - all schema changes force
-         * explicit memtable flushes.
-         */
-        LegacySchemaMigrator.migrate();
-
         // Populate token metadata before flushing, for token-aware sstable partitioning (#6696)
         StorageService.instance.populateTokenMetadata();
 
@@ -333,12 +311,6 @@ public class CassandraDaemon
         // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293)
         StorageService.instance.populateTokenMetadata();
 
-        // migrate any legacy (pre-3.0) hints from system.hints table into the new store
-        new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
-
-        // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
-        LegacyBatchlogMigrator.migrate();
-
         // enable auto compaction
         for (Keyspace keyspace : Keyspace.all())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1..48ad2c6 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -512,7 +512,7 @@ public class DataResolver extends ResponseResolver
                 if (StorageProxy.canDoLocalRequest(source))
                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
-                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
+                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
 
                 // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 11c0b12..6e0fadb 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -247,10 +247,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 for (InetAddress endpoint : endpoints)
-                {
-                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
-                    MessagingService.instance().sendRR(message, endpoint, repairHandler);
-                }
+                    MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 83971dd..75f7788 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -259,14 +259,15 @@ public class StartupChecks
 
             FileVisitor<Path> sstableVisitor = new SimpleFileVisitor<Path>()
             {
-                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+                public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
                 {
-                    if (!Descriptor.isValidFile(file.getFileName().toString()))
+                    File file = path.toFile();
+                    if (!Descriptor.isValidFile(file))
                         return FileVisitResult.CONTINUE;
 
                     try
                     {
-                        if (!Descriptor.fromFilename(file.toString()).isCompatible())
+                        if (!Descriptor.fromFilename(file).isCompatible())
                             invalid.add(file.toString());
                     }
                     catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0be68c..77862d6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -909,10 +908,10 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime));
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -969,33 +968,19 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
-        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints,
                                                                      Collections.<InetAddress>emptyList(),
-                                                                     endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+                                                                     endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
                                                                      Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME),
                                                                      null,
                                                                      WriteType.BATCH_LOG,
                                                                      queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
-
-        if (!endpoints.current.isEmpty())
-            syncWriteToBatchlog(handler, batch, endpoints.current);
-
-        if (!endpoints.legacy.isEmpty())
-            LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
-
-        handler.get();
-    }
-
-    private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
-    throws WriteTimeoutException, WriteFailureException
-    {
         MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-
         for (InetAddress target : endpoints)
         {
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
@@ -1005,15 +990,7 @@ public class StorageProxy implements StorageProxyMBean
             else
                 MessagingService.instance().sendRR(message, target, handler);
         }
-    }
-
-    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
-    {
-        if (!endpoints.current.isEmpty())
-            asyncRemoveFromBatchlog(endpoints.current, uuid);
-
-        if (!endpoints.legacy.isEmpty())
-            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime);
+        handler.get();
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
@@ -1160,38 +1137,13 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /*
-     * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not.
-     */
-    private static final class BatchlogEndpoints
-    {
-        public final Collection<InetAddress> all;
-        public final Collection<InetAddress> current;
-        public final Collection<InetAddress> legacy;
-
-        BatchlogEndpoints(Collection<InetAddress> endpoints)
-        {
-            all = endpoints;
-            current = new ArrayList<>(2);
-            legacy = new ArrayList<>(2);
-
-            for (InetAddress ep : endpoints)
-            {
-                if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30)
-                    current.add(ep);
-                else
-                    legacy.add(ep);
-            }
-        }
-    }
-
-    /*
      * Replicas are picked manually:
      * - replicas should be alive according to the failure detector
      * - replicas should be in the local datacenter
      * - choose min(2, number of qualifying candiates above)
      * - allow the local node to be the only replica only if it's a single-node DC
      */
-    private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
     throws UnavailableException
     {
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1202,12 +1154,12 @@ public class StorageProxy implements StorageProxyMBean
         if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)
-                return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress()));
+                return Collections.singleton(FBUtilities.getBroadcastAddress());
 
             throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
         }
 
-        return new BatchlogEndpoints(chosenEndpoints);
+        return chosenEndpoints;
     }
 
     /**
@@ -1816,9 +1768,8 @@ public class StorageProxy implements StorageProxyMBean
 
                 for (InetAddress endpoint : executor.getContactedReplicas())
                 {
-                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing full data read to {}", endpoint);
-                    MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
+                    MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, repairHandler);
                 }
             }
         }
@@ -2218,9 +2169,8 @@ public class StorageProxy implements StorageProxyMBean
             {
                 for (InetAddress endpoint : toQuery.filteredEndpoints)
                 {
-                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing request to {}", endpoint);
-                    MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+                    MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), endpoint, handler);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 07eb1d8..62efed2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -257,12 +257,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName);
 
+        ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler();
+
         /* register the verb handlers */
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, readHandler);
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, readHandler);
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, readHandler);
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
@@ -2082,8 +2084,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isRpcReady(InetAddress endpoint)
     {
-        return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 ||
-                Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
+        return Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
     }
 
     public void setRpcReady(boolean value)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index af94869..3b0364c 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -113,32 +113,20 @@ public class Commit
     {
         public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out);
-
             UUIDSerializer.serializer.serialize(commit.ballot, out, version);
             PartitionUpdate.serializer.serialize(commit.update, out, version);
         }
 
         public Commit deserialize(DataInputPlus in, int version) throws IOException
         {
-            ByteBuffer key = null;
-            if (version < MessagingService.VERSION_30)
-                key = ByteBufferUtil.readWithShortLength(in);
-
             UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
-            PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key);
+            PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL);
             return new Commit(ballot, update);
         }
 
         public long serializedSize(Commit commit, int version)
         {
-            int size = 0;
-            if (version < MessagingService.VERSION_30)
-                size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey());
-
-            return size
-                 + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
+            return UUIDSerializer.serializer.serializedSize(commit.ballot, version)
                  + PartitionUpdate.serializer.serializedSize(commit.update, version);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index f843b8d..d8699c8 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -69,51 +69,22 @@ public class PrepareResponse
         {
             out.writeBoolean(response.promised);
             Commit.serializer.serialize(response.inProgressCommit, out, version);
-
-            if (version < MessagingService.VERSION_30)
-            {
-                UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version);
-                PartitionUpdate.serializer.serialize(response.mostRecentCommit.update, out, version);
-            }
-            else
-            {
-                Commit.serializer.serialize(response.mostRecentCommit, out, version);
-            }
+            Commit.serializer.serialize(response.mostRecentCommit, out, version);
         }
 
         public PrepareResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             boolean success = in.readBoolean();
             Commit inProgress = Commit.serializer.deserialize(in, version);
-            Commit mostRecent;
-            if (version < MessagingService.VERSION_30)
-            {
-                UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
-                PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, inProgress.update.partitionKey());
-                mostRecent = new Commit(ballot, update);
-            }
-            else
-            {
-                mostRecent = Commit.serializer.deserialize(in, version);
-            }
+            Commit mostRecent = Commit.serializer.deserialize(in, version);
             return new PrepareResponse(success, inProgress, mostRecent);
         }
 
         public long serializedSize(PrepareResponse response, int version)
         {
-            long size = TypeSizes.sizeof(response.promised)
-                      + Commit.serializer.serializedSize(response.inProgressCommit, version);
-
-            if (version < MessagingService.VERSION_30)
-            {
-                size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version);
-                size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version);
-            }
-            else
-            {
-                size += Commit.serializer.serializedSize(response.mostRecentCommit, version);
-            }
-            return size;
+            return TypeSizes.sizeof(response.promised)
+                 + Commit.serializer.serializedSize(response.inProgressCommit, version)
+                 + Commit.serializer.serializedSize(response.mostRecentCommit, version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 6465bf7..fab9372 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -196,16 +196,7 @@ public class StreamReader
                                   long totalSize, UUID sessionId) throws IOException
         {
             this.metadata = metadata;
-            // streaming pre-3.0 sstables require mark/reset support from source stream
-            if (version.correspondingMessagingVersion() < MessagingService.VERSION_30)
-            {
-                logger.trace("Initializing rewindable input stream for reading legacy sstable with {} bytes with following " +
-                             "parameters: initial_mem_buffer_size={}, max_mem_buffer_size={}, max_spill_file_size={}.",
-                             totalSize, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, MAX_SPILL_FILE_SIZE);
-                File bufferFile = getTempBufferFile(metadata, totalSize, sessionId);
-                this.in = new RewindableDataInputStreamPlus(in, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, bufferFile, MAX_SPILL_FILE_SIZE);
-            } else
-                this.in = new DataInputPlus.DataInputStreamPlus(in);
+            this.in = new DataInputPlus.DataInputStreamPlus(in);
             this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
             this.header = header;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 70b5765..2044d4d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -81,7 +82,7 @@ public class CompressedStreamReader extends StreamReader
                      cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
-                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
+                                                              ChecksumType.CRC32, cfs::getCrcCheckChance);
         TrackedInputStream in = new TrackedInputStream(cis);
 
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 232727d..b0639ea 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -189,13 +189,7 @@ public class FileMessageHeader
             UUIDSerializer.serializer.serialize(header.cfId, out, version);
             out.writeInt(header.sequenceNumber);
             out.writeUTF(header.version.toString());
-
-            //We can't stream to a node that doesn't understand a new sstable format
-            if (version < StreamMessage.VERSION_22 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG)
-                throw new UnsupportedOperationException("Can't stream non-legacy sstables to nodes < 2.2");
-
-            if (version >= StreamMessage.VERSION_22)
-                out.writeUTF(header.format.name);
+            out.writeUTF(header.format.name);
 
             out.writeLong(header.estimatedKeys);
             out.writeInt(header.sections.size());
@@ -212,8 +206,7 @@ public class FileMessageHeader
             out.writeLong(header.repairedAt);
             out.writeInt(header.sstableLevel);
 
-            if (version >= StreamMessage.VERSION_30 && header.version.storeRows())
-                SerializationHeader.serializer.serialize(header.version, header.header, out);
+            SerializationHeader.serializer.serialize(header.version, header.header, out);
             return compressionInfo;
         }
 
@@ -222,10 +215,7 @@ public class FileMessageHeader
             UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int sequenceNumber = in.readInt();
             Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
-
-            SSTableFormat.Type format = SSTableFormat.Type.LEGACY;
-            if (version >= StreamMessage.VERSION_22)
-                format = SSTableFormat.Type.validate(in.readUTF());
+            SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
 
             long estimatedKeys = in.readLong();
             int count = in.readInt();
@@ -235,9 +225,7 @@ public class FileMessageHeader
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
             long repairedAt = in.readLong();
             int sstableLevel = in.readInt();
-            SerializationHeader.Component header = version >= StreamMessage.VERSION_30 && sstableVersion.storeRows()
-                                                 ? SerializationHeader.serializer.deserialize(sstableVersion, in)
-                                                 : null;
+            SerializationHeader.Component header =  SerializationHeader.serializer.deserialize(sstableVersion, in);
 
             return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
         }
@@ -247,10 +235,7 @@ public class FileMessageHeader
             long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
             size += TypeSizes.sizeof(header.sequenceNumber);
             size += TypeSizes.sizeof(header.version.toString());
-
-            if (version >= StreamMessage.VERSION_22)
-                size += TypeSizes.sizeof(header.format.name);
-
+            size += TypeSizes.sizeof(header.format.name);
             size += TypeSizes.sizeof(header.estimatedKeys);
 
             size += TypeSizes.sizeof(header.sections.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 7487aaf..3ce1958 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,8 +33,6 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int VERSION_20 = 2;
-    public static final int VERSION_22 = 3;
     public static final int VERSION_30 = 4;
     public static final int CURRENT_VERSION = VERSION_30;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 070434d..52d5ecf 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -93,8 +93,8 @@ public class SSTableExport
      */
     public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException
     {
-        if (!desc.version.storeRows())
-            throw new IOException("pre-3.0 SSTable is not supported.");
+        if (!desc.version.isCompatible())
+            throw new IOException("Cannot process old and unsupported SSTable version.");
 
         EnumSet<MetadataType> types = EnumSet.of(MetadataType.STATS, MetadataType.HEADER);
         Map<MetadataType, MetadataComponent> sstableMetadata = desc.getMetadataSerializer().deserialize(desc, types);
@@ -162,11 +162,6 @@ public class SSTableExport
                         : cmd.getOptionValues(EXCLUDE_KEY_OPTION)));
         String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath();
 
-        if (Descriptor.isLegacyFile(new File(ssTableFileName)))
-        {
-            System.err.println("Unsupported legacy sstable");
-            System.exit(1);
-        }
         if (!new File(ssTableFileName).exists())
         {
             System.err.println("Cannot find file " + ssTableFileName);