You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/11/30 09:49:59 UTC
[06/11] cassandra git commit: Remove pre-3.0 compatibility code for
4.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
deleted file mode 100644
index d0fc151..0000000
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ /dev/null
@@ -1,1099 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.schema;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.FieldIdentifier;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.functions.FunctionName;
-import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.functions.UDFunction;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static java.lang.String.format;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-
-/**
- * This majestic class performs migration from legacy (pre-3.0) system.schema_* schema tables to the new and glorious
- * system_schema keyspace.
- *
- * The goal is to not lose any information in the migration - including the timestamps.
- */
-@SuppressWarnings("deprecation")
-public final class LegacySchemaMigrator
-{
- private LegacySchemaMigrator()
- {
- }
-
- private static final Logger logger = LoggerFactory.getLogger(LegacySchemaMigrator.class);
-
- static final List<CFMetaData> LegacySchemaTables =
- ImmutableList.of(SystemKeyspace.LegacyKeyspaces,
- SystemKeyspace.LegacyColumnfamilies,
- SystemKeyspace.LegacyColumns,
- SystemKeyspace.LegacyTriggers,
- SystemKeyspace.LegacyUsertypes,
- SystemKeyspace.LegacyFunctions,
- SystemKeyspace.LegacyAggregates);
-
- public static void migrate()
- {
- // read metadata from the legacy schema tables
- Collection<Keyspace> keyspaces = readSchema();
-
- // if already upgraded, or starting a new 3.0 node, abort early
- if (keyspaces.isEmpty())
- {
- unloadLegacySchemaTables();
- return;
- }
-
- // write metadata to the new schema tables
- logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
- keyspaces.size(),
- SchemaConstants.SCHEMA_KEYSPACE_NAME);
- keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables);
- keyspaces.forEach(LegacySchemaMigrator::migrateBuiltIndexesForKeyspace);
-
- // flush the new tables before truncating the old ones
- SchemaKeyspace.flush();
-
- // truncate the original tables (will be snapshotted now, and will have been snapshotted by pre-flight checks)
- logger.info("Truncating legacy schema tables");
- truncateLegacySchemaTables();
-
- // remove legacy schema tables from Schema, so that their presence doesn't give the users any wrong ideas
- unloadLegacySchemaTables();
-
- logger.info("Completed migration of legacy schema tables");
- }
-
- private static void migrateBuiltIndexesForKeyspace(Keyspace keyspace)
- {
- keyspace.tables.forEach(LegacySchemaMigrator::migrateBuiltIndexesForTable);
- }
-
- private static void migrateBuiltIndexesForTable(Table table)
- {
- table.metadata.getIndexes().forEach((index) -> migrateIndexBuildStatus(table.metadata.ksName,
- table.metadata.cfName,
- index));
- }
-
- private static void migrateIndexBuildStatus(String keyspace, String table, IndexMetadata index)
- {
- if (SystemKeyspace.isIndexBuilt(keyspace, table + '.' + index.name))
- {
- SystemKeyspace.setIndexBuilt(keyspace, index.name);
- SystemKeyspace.setIndexRemoved(keyspace, table + '.' + index.name);
- }
- }
-
- static void unloadLegacySchemaTables()
- {
- KeyspaceMetadata systemKeyspace = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-
- Tables systemTables = systemKeyspace.tables;
- for (CFMetaData table : LegacySchemaTables)
- systemTables = systemTables.without(table.cfName);
-
- LegacySchemaTables.forEach(Schema.instance::unload);
- LegacySchemaTables.forEach((cfm) -> org.apache.cassandra.db.Keyspace.openAndGetStore(cfm).invalidate());
-
- Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
- }
-
- private static void truncateLegacySchemaTables()
- {
- LegacySchemaTables.forEach(table -> Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking());
- }
-
- private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
- {
- logger.info("Migrating keyspace {}", keyspace);
-
- Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp);
- for (Table table : keyspace.tables)
- SchemaKeyspace.addTableToSchemaMutation(table.metadata, true, builder.timestamp(table.timestamp));
-
- for (Type type : keyspace.types)
- SchemaKeyspace.addTypeToSchemaMutation(type.metadata, builder.timestamp(type.timestamp));
-
- for (Function function : keyspace.functions)
- SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, builder.timestamp(function.timestamp));
-
- for (Aggregate aggregate : keyspace.aggregates)
- SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, builder.timestamp(aggregate.timestamp));
-
- builder.build().apply();
- }
-
- /*
- * Read all keyspaces metadata (including nested tables, types, and functions), with their modification timestamps
- */
- private static Collection<Keyspace> readSchema()
- {
- String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_KEYSPACES);
- Collection<String> keyspaceNames = new ArrayList<>();
- query(query).forEach(row -> keyspaceNames.add(row.getString("keyspace_name")));
- keyspaceNames.removeAll(SchemaConstants.SYSTEM_KEYSPACE_NAMES);
-
- Collection<Keyspace> keyspaces = new ArrayList<>();
- keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name)));
- return keyspaces;
- }
-
- private static Keyspace readKeyspace(String keyspaceName)
- {
- long timestamp = readKeyspaceTimestamp(keyspaceName);
- KeyspaceParams params = readKeyspaceParams(keyspaceName);
-
- Collection<Table> tables = readTables(keyspaceName);
- Collection<Type> types = readTypes(keyspaceName);
- Collection<Function> functions = readFunctions(keyspaceName);
- Functions.Builder functionsBuilder = Functions.builder();
- functions.forEach(udf -> functionsBuilder.add(udf.metadata));
- Collection<Aggregate> aggregates = readAggregates(functionsBuilder.build(), keyspaceName);
-
- return new Keyspace(timestamp, keyspaceName, params, tables, types, functions, aggregates);
- }
-
- /*
- * Reading keyspace params
- */
-
- private static long readKeyspaceTimestamp(String keyspaceName)
- {
- String query = format("SELECT writeTime(durable_writes) AS timestamp FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_KEYSPACES);
- return query(query, keyspaceName).one().getLong("timestamp");
- }
-
- private static KeyspaceParams readKeyspaceParams(String keyspaceName)
- {
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_KEYSPACES);
- UntypedResultSet.Row row = query(query, keyspaceName).one();
-
- boolean durableWrites = row.getBoolean("durable_writes");
-
- Map<String, String> replication = new HashMap<>();
- replication.putAll(fromJsonMap(row.getString("strategy_options")));
- replication.put(ReplicationParams.CLASS, row.getString("strategy_class"));
-
- return KeyspaceParams.create(durableWrites, replication);
- }
-
- /*
- * Reading tables
- */
-
- private static Collection<Table> readTables(String keyspaceName)
- {
- String query = format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_COLUMNFAMILIES);
- Collection<String> tableNames = new ArrayList<>();
- query(query, keyspaceName).forEach(row -> tableNames.add(row.getString("columnfamily_name")));
-
- Collection<Table> tables = new ArrayList<>();
- tableNames.forEach(name -> tables.add(readTable(keyspaceName, name)));
- return tables;
- }
-
- private static Table readTable(String keyspaceName, String tableName)
- {
- long timestamp = readTableTimestamp(keyspaceName, tableName);
- CFMetaData metadata = readTableMetadata(keyspaceName, tableName);
- return new Table(timestamp, metadata);
- }
-
- private static long readTableTimestamp(String keyspaceName, String tableName)
- {
- String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_COLUMNFAMILIES);
- return query(query, keyspaceName, tableName).one().getLong("timestamp");
- }
-
- private static CFMetaData readTableMetadata(String keyspaceName, String tableName)
- {
- String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_COLUMNFAMILIES);
- UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, tableName).one();
-
- String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_COLUMNS);
- UntypedResultSet columnRows = query(columnsQuery, keyspaceName, tableName);
-
- String triggersQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_TRIGGERS);
- UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, tableName);
-
- return decodeTableMetadata(tableRow, columnRows, triggerRows);
- }
-
- private static CFMetaData decodeTableMetadata(UntypedResultSet.Row tableRow,
- UntypedResultSet columnRows,
- UntypedResultSet triggerRows)
- {
- String ksName = tableRow.getString("keyspace_name");
- String cfName = tableRow.getString("columnfamily_name");
-
- AbstractType<?> rawComparator = TypeParser.parse(tableRow.getString("comparator"));
- AbstractType<?> subComparator = tableRow.has("subcomparator") ? TypeParser.parse(tableRow.getString("subcomparator")) : null;
-
- boolean isSuper = "super".equals(tableRow.getString("type").toLowerCase(Locale.ENGLISH));
- boolean isCompound = rawComparator instanceof CompositeType || isSuper;
-
- /*
- * Determine whether or not the table is *really* dense
- * We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
- * but we can trust is_dense value of false.
- */
- Boolean rawIsDense = tableRow.has("is_dense") ? tableRow.getBoolean("is_dense") : null;
- boolean isDense;
- if (rawIsDense != null && !rawIsDense)
- isDense = false;
- else
- isDense = calculateIsDense(rawComparator, columnRows);
-
- // now, if switched to sparse, remove redundant compact_value column and the last clustering column,
- // directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
- Iterable<UntypedResultSet.Row> filteredColumnRows = !isDense && (rawIsDense == null || rawIsDense)
- ? filterOutRedundantRowsForSparse(columnRows, isSuper, isCompound)
- : columnRows;
-
- // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table
- AbstractType<?> defaultValidator = TypeParser.parse(tableRow.getString("default_validator"));
- boolean isCounter = defaultValidator instanceof CounterColumnType;
-
- /*
- * With CASSANDRA-5202 we stopped inferring the cf id from the combination of keyspace/table names,
- * and started storing the generated uuids in system.schema_columnfamilies.
- *
- * In 3.0 we SHOULD NOT see tables like that (2.0-created, non-upgraded).
- * But in the off-chance that we do, we generate the deterministic uuid here.
- */
- UUID cfId = tableRow.has("cf_id")
- ? tableRow.getUUID("cf_id")
- : CFMetaData.generateLegacyCfId(ksName, cfName);
-
- boolean isCQLTable = !isSuper && !isDense && isCompound;
- boolean isStaticCompactTable = !isDense && !isCompound;
-
- // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from
- // previous versions, they may not have the expected schema, so detect if we need to upgrade and do
- // it in createColumnsFromColumnRows.
- // We can remove this once we don't support upgrade from versions < 3.0.
- boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(filteredColumnRows, isSuper, isStaticCompactTable);
-
- List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(filteredColumnRows,
- ksName,
- cfName,
- rawComparator,
- subComparator,
- isSuper,
- isCQLTable,
- isStaticCompactTable,
- needsUpgrade);
-
- if (needsUpgrade)
- {
- addDefinitionForUpgrade(columnDefs,
- ksName,
- cfName,
- isStaticCompactTable,
- isSuper,
- rawComparator,
- subComparator,
- defaultValidator);
- }
-
- CFMetaData cfm = CFMetaData.create(ksName,
- cfName,
- cfId,
- isDense,
- isCompound,
- isSuper,
- isCounter,
- false, // legacy schema did not contain views
- columnDefs,
- DatabaseDescriptor.getPartitioner());
-
- Indexes indexes = createIndexesFromColumnRows(cfm,
- filteredColumnRows,
- ksName,
- cfName,
- rawComparator,
- subComparator,
- isSuper,
- isCQLTable,
- isStaticCompactTable,
- needsUpgrade);
- cfm.indexes(indexes);
-
- if (tableRow.has("dropped_columns"))
- addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance));
-
- return cfm.params(decodeTableParams(tableRow))
- .triggers(createTriggersFromTriggerRows(triggerRows));
- }
-
- /*
- * We call dense a CF for which each component of the comparator is a clustering column, i.e. no
- * component is used to store a regular column names. In other words, non-composite static "thrift"
- * and CQL3 CF are *not* dense.
- * We save whether the table is dense or not during table creation through CQL, but we don't have this
- * information for table just created through thrift, nor for table prior to CASSANDRA-7744, so this
- * method does its best to infer whether the table is dense or not based on other elements.
- */
- private static boolean calculateIsDense(AbstractType<?> comparator, UntypedResultSet columnRows)
- {
- /*
- * As said above, this method is only here because we need to deal with thrift upgrades.
- * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
- * then we'll have saved the "is_dense" value and will be good to go.
- *
- * But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
- * to infer that information without relying on it in that case. And for the most part this is
- * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
- * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
- * PRIMARY KEY defined.
- *
- * So we need to recognize those special case CQL3 table with only a primary key. If we have some
- * clustering columns, we're fine as said above. So the only problem is that we cannot decide for
- * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
- * has been created in CQL3 by say:
- * CREATE TABLE test (k int PRIMARY KEY)
- * in which case it should not be dense. However, we can limit our margin of error by assuming we are
- * in the latter case only if the comparator is exactly CompositeType(UTF8Type).
- */
- for (UntypedResultSet.Row columnRow : columnRows)
- if ("regular".equals(columnRow.getString("type")))
- return false;
-
- int maxClusteringIdx = -1;
- for (UntypedResultSet.Row columnRow : columnRows)
- if ("clustering_key".equals(columnRow.getString("type")))
- maxClusteringIdx = Math.max(maxClusteringIdx, columnRow.has("component_index") ? columnRow.getInt("component_index") : 0);
-
- return maxClusteringIdx >= 0
- ? maxClusteringIdx == comparator.componentsCount() - 1
- : !isCQL3OnlyPKComparator(comparator);
- }
-
- private static Iterable<UntypedResultSet.Row> filterOutRedundantRowsForSparse(UntypedResultSet columnRows, boolean isSuper, boolean isCompound)
- {
- Collection<UntypedResultSet.Row> filteredRows = new ArrayList<>();
- for (UntypedResultSet.Row columnRow : columnRows)
- {
- String kind = columnRow.getString("type");
-
- if ("compact_value".equals(kind))
- continue;
-
- if ("clustering_key".equals(kind))
- {
- int position = columnRow.has("component_index") ? columnRow.getInt("component_index") : 0;
- if (isSuper && position != 0)
- continue;
-
- if (!isSuper && !isCompound)
- continue;
- }
-
- filteredRows.add(columnRow);
- }
-
- return filteredRows;
- }
-
- private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
- {
- if (!(comparator instanceof CompositeType))
- return false;
-
- CompositeType ct = (CompositeType)comparator;
- return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type;
- }
-
- private static TableParams decodeTableParams(UntypedResultSet.Row row)
- {
- TableParams.Builder params = TableParams.builder();
-
- params.readRepairChance(row.getDouble("read_repair_chance"))
- .dcLocalReadRepairChance(row.getDouble("local_read_repair_chance"))
- .gcGraceSeconds(row.getInt("gc_grace_seconds"));
-
- if (row.has("comment"))
- params.comment(row.getString("comment"));
-
- if (row.has("memtable_flush_period_in_ms"))
- params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"));
-
- params.caching(CachingParams.fromMap(fromJsonMap(row.getString("caching"))));
-
- if (row.has("default_time_to_live"))
- params.defaultTimeToLive(row.getInt("default_time_to_live"));
-
- if (row.has("speculative_retry"))
- params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
-
- Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters"));
- String crcCheckChance = compressionParameters.remove("crc_check_chance");
- //crc_check_chance was promoted from a compression property to a top-level property
- if (crcCheckChance != null)
- params.crcCheckChance(Double.parseDouble(crcCheckChance));
-
- params.compression(CompressionParams.fromMap(compressionParameters));
-
- params.compaction(compactionFromRow(row));
-
- if (row.has("min_index_interval"))
- params.minIndexInterval(row.getInt("min_index_interval"));
-
- if (row.has("max_index_interval"))
- params.maxIndexInterval(row.getInt("max_index_interval"));
-
- if (row.has("bloom_filter_fp_chance"))
- params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"));
-
- return params.build();
- }
-
- /*
- * The method is needed - to migrate max_compaction_threshold and min_compaction_threshold
- * to the compaction map, where they belong.
- *
- * We must use reflection to validate the options because not every compaction strategy respects and supports
- * the threshold params (LCS doesn't, STCS and DTCS do).
- */
- @SuppressWarnings("unchecked")
- private static CompactionParams compactionFromRow(UntypedResultSet.Row row)
- {
- Class<? extends AbstractCompactionStrategy> klass =
- CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class"));
- Map<String, String> options = fromJsonMap(row.getString("compaction_strategy_options"));
-
- int minThreshold = row.getInt("min_compaction_threshold");
- int maxThreshold = row.getInt("max_compaction_threshold");
-
- Map<String, String> optionsWithThresholds = new HashMap<>(options);
- optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold));
- optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold));
-
- try
- {
- Map<String, String> unrecognizedOptions =
- (Map<String, String>) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds);
-
- if (unrecognizedOptions.isEmpty())
- options = optionsWithThresholds;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- return CompactionParams.create(klass, options);
- }
-
- // Should only be called on compact tables
- private static boolean checkNeedsUpgrade(Iterable<UntypedResultSet.Row> defs, boolean isSuper, boolean isStaticCompactTable)
- {
- if (isSuper)
- {
- // Check if we've added the "supercolumn map" column yet or not
- for (UntypedResultSet.Row row : defs)
- if (row.getString("column_name").isEmpty())
- return false;
- return true;
- }
-
- // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet,
- // i.e. if we don't have a static definition yet.
- if (isStaticCompactTable)
- return !hasKind(defs, ColumnDefinition.Kind.STATIC);
-
- // For dense compact tables, we need to upgrade if we don't have a compact value definition
- return !hasRegularColumns(defs);
- }
-
- private static boolean hasRegularColumns(Iterable<UntypedResultSet.Row> columnRows)
- {
- for (UntypedResultSet.Row row : columnRows)
- {
- /*
- * We need to special case and ignore the empty compact column (pre-3.0, COMPACT STORAGE, primary-key only tables),
- * since deserializeKind() will otherwise just return a REGULAR.
- * We want the proper EmptyType regular column to be added by addDefinitionForUpgrade(), so we need
- * checkNeedsUpgrade() to return true in this case.
- * See CASSANDRA-9874.
- */
- if (isEmptyCompactValueColumn(row))
- return false;
-
- if (deserializeKind(row.getString("type")) == ColumnDefinition.Kind.REGULAR)
- return true;
- }
-
- return false;
- }
-
- private static boolean isEmptyCompactValueColumn(UntypedResultSet.Row row)
- {
- return "compact_value".equals(row.getString("type")) && row.getString("column_name").isEmpty();
- }
-
- private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
- String ksName,
- String cfName,
- boolean isStaticCompactTable,
- boolean isSuper,
- AbstractType<?> rawComparator,
- AbstractType<?> subComparator,
- AbstractType<?> defaultValidator)
- {
- CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
-
- if (isSuper)
- {
- defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true)));
- }
- else if (isStaticCompactTable)
- {
- defs.add(ColumnDefinition.clusteringDef(ksName, cfName, names.defaultClusteringName(), rawComparator, 0));
- defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator));
- }
- else
- {
- // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it
- // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too))
- defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance));
- }
- }
-
- private static boolean hasKind(Iterable<UntypedResultSet.Row> defs, ColumnDefinition.Kind kind)
- {
- for (UntypedResultSet.Row row : defs)
- if (deserializeKind(row.getString("type")) == kind)
- return true;
-
- return false;
- }
-
- /*
- * Prior to 3.0 we used to not store the type of the dropped columns, relying on all collection info being
- * present in the comparator, forever. That allowed us to perform certain validations in AlterTableStatement
- * (namely not allowing to re-add incompatible collection columns, with the same name, but a different type).
- *
- * In 3.0, we no longer preserve the original comparator, and reconstruct it from the columns instead. That means
- * that we should preserve the type of the dropped columns now, and, during migration, fetch the types from
- * the original comparator if necessary.
- */
- private static void addDroppedColumns(CFMetaData cfm, AbstractType<?> comparator, Map<String, Long> droppedTimes)
- {
- AbstractType<?> last = comparator.getComponents().get(comparator.componentsCount() - 1);
- Map<ByteBuffer, CollectionType> collections = last instanceof ColumnToCollectionType
- ? ((ColumnToCollectionType) last).defined
- : Collections.emptyMap();
-
- for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
- {
- String name = entry.getKey();
- ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
- long time = entry.getValue();
-
- AbstractType<?> type = collections.containsKey(nameBytes)
- ? collections.get(nameBytes)
- : BytesType.instance;
-
- cfm.getDroppedColumns().put(nameBytes, new CFMetaData.DroppedColumn(name, type, time, ColumnDefinition.Kind.REGULAR));
- }
- }
-
- private static List<ColumnDefinition> createColumnsFromColumnRows(Iterable<UntypedResultSet.Row> rows,
- String keyspace,
- String table,
- AbstractType<?> rawComparator,
- AbstractType<?> rawSubComparator,
- boolean isSuper,
- boolean isCQLTable,
- boolean isStaticCompactTable,
- boolean needsUpgrade)
- {
- List<ColumnDefinition> columns = new ArrayList<>();
-
- for (UntypedResultSet.Row row : rows)
- {
- // Skip the empty compact value column. Make addDefinitionForUpgrade() re-add the proper REGULAR one.
- if (isEmptyCompactValueColumn(row))
- continue;
-
- columns.add(createColumnFromColumnRow(row,
- keyspace,
- table,
- rawComparator,
- rawSubComparator,
- isSuper,
- isCQLTable,
- isStaticCompactTable,
- needsUpgrade));
- }
-
- return columns;
- }
-
- private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
- String keyspace,
- String table,
- AbstractType<?> rawComparator,
- AbstractType<?> rawSubComparator,
- boolean isSuper,
- boolean isCQLTable,
- boolean isStaticCompactTable,
- boolean needsUpgrade)
- {
- String rawKind = row.getString("type");
-
- ColumnDefinition.Kind kind = deserializeKind(rawKind);
- if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
- kind = ColumnDefinition.Kind.STATIC;
-
- int componentIndex = ColumnDefinition.NO_POSITION;
- // Note that the component_index is not useful for non-primary key parts (it never really in fact since there is
- // no particular ordering of non-PK columns, we only used to use it as a simplification but that's not needed
- // anymore)
- if (kind.isPrimaryKeyKind())
- // We use to not have a component index when there was a single partition key, we don't anymore (#10491)
- componentIndex = row.has("component_index") ? row.getInt("component_index") : 0;
-
- // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
- // we need to use the comparator fromString method
- AbstractType<?> comparator = isCQLTable
- ? UTF8Type.instance
- : CompactTables.columnDefinitionComparator(rawKind, isSuper, rawComparator, rawSubComparator);
- ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
-
- AbstractType<?> validator = parseType(row.getString("validator"));
-
- // In the 2.x schema we didn't store UDT's with a FrozenType wrapper because they were implicitly frozen. After
- // CASSANDRA-7423 (non-frozen UDTs), this is no longer true, so we need to freeze UDTs and nested freezable
- // types (UDTs and collections) to properly migrate the schema. See CASSANDRA-11609 and CASSANDRA-11613.
- if (validator.isUDT() && validator.isMultiCell())
- validator = validator.freeze();
- else
- validator = validator.freezeNestedMulticellTypes();
-
- return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
- }
-
- private static Indexes createIndexesFromColumnRows(CFMetaData cfm,
- Iterable<UntypedResultSet.Row> rows,
- String keyspace,
- String table,
- AbstractType<?> rawComparator,
- AbstractType<?> rawSubComparator,
- boolean isSuper,
- boolean isCQLTable,
- boolean isStaticCompactTable,
- boolean needsUpgrade)
- {
- Indexes.Builder indexes = Indexes.builder();
-
- for (UntypedResultSet.Row row : rows)
- {
- IndexMetadata.Kind kind = null;
- if (row.has("index_type"))
- kind = IndexMetadata.Kind.valueOf(row.getString("index_type"));
-
- if (kind == null)
- continue;
-
- Map<String, String> indexOptions = null;
- if (row.has("index_options"))
- indexOptions = fromJsonMap(row.getString("index_options"));
-
- if (row.has("index_name"))
- {
- String indexName = row.getString("index_name");
-
- ColumnDefinition column = createColumnFromColumnRow(row,
- keyspace,
- table,
- rawComparator,
- rawSubComparator,
- isSuper,
- isCQLTable,
- isStaticCompactTable,
- needsUpgrade);
-
- indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
- }
- else
- {
- logger.error("Failed to find index name for legacy migration of index on {}.{}", keyspace, table);
- }
- }
-
- return indexes.build();
- }
-
- private static ColumnDefinition.Kind deserializeKind(String kind)
- {
- if ("clustering_key".equalsIgnoreCase(kind))
- return ColumnDefinition.Kind.CLUSTERING;
-
- if ("compact_value".equalsIgnoreCase(kind))
- return ColumnDefinition.Kind.REGULAR;
-
- return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
- }
-
- private static Triggers createTriggersFromTriggerRows(UntypedResultSet rows)
- {
- Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
- rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
- return triggers.build();
- }
-
- private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
- {
- String name = row.getString("trigger_name");
- String classOption = row.getTextMap("trigger_options").get("class");
- return new TriggerMetadata(name, classOption);
- }
-
- /*
- * Reading user types
- */
-
- private static Collection<Type> readTypes(String keyspaceName)
- {
- String query = format("SELECT type_name FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_USERTYPES);
- Collection<String> typeNames = new ArrayList<>();
- query(query, keyspaceName).forEach(row -> typeNames.add(row.getString("type_name")));
-
- Collection<Type> types = new ArrayList<>();
- typeNames.forEach(name -> types.add(readType(keyspaceName, name)));
- return types;
- }
-
- private static Type readType(String keyspaceName, String typeName)
- {
- long timestamp = readTypeTimestamp(keyspaceName, typeName);
- UserType metadata = readTypeMetadata(keyspaceName, typeName);
- return new Type(timestamp, metadata);
- }
-
- /*
- * Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
- * use the writeTime() CQL function, and must resort to a lower level.
- */
- private static long readTypeTimestamp(String keyspaceName, String typeName)
- {
- ColumnFamilyStore store = org.apache.cassandra.db.Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME)
- .getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES);
-
- ClusteringComparator comparator = store.metadata.comparator;
- Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
- int nowInSec = FBUtilities.nowInSeconds();
- DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName));
- SinglePartitionReadCommand command = SinglePartitionReadCommand.create(store.metadata, nowInSec, key, slices);
-
- try (ReadExecutionController controller = command.executionController();
- RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, controller), nowInSec))
- {
- return partition.next().primaryKeyLivenessInfo().timestamp();
- }
- }
-
- private static UserType readTypeMetadata(String keyspaceName, String typeName)
- {
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND type_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_USERTYPES);
- UntypedResultSet.Row row = query(query, keyspaceName, typeName).one();
-
- List<FieldIdentifier> names =
- row.getList("field_names", UTF8Type.instance)
- .stream()
- .map(t -> FieldIdentifier.forInternalString(t))
- .collect(Collectors.toList());
-
- List<AbstractType<?>> types =
- row.getList("field_types", UTF8Type.instance)
- .stream()
- .map(LegacySchemaMigrator::parseType)
- .collect(Collectors.toList());
-
- return new UserType(keyspaceName, bytes(typeName), names, types, true);
- }
-
- /*
- * Reading UDFs
- */
-
- private static Collection<Function> readFunctions(String keyspaceName)
- {
- String query = format("SELECT function_name, signature FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_FUNCTIONS);
- HashMultimap<String, List<String>> functionSignatures = HashMultimap.create();
- query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance)));
-
- Collection<Function> functions = new ArrayList<>();
- functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue())));
- return functions;
- }
-
- private static Function readFunction(String keyspaceName, String functionName, List<String> signature)
- {
- long timestamp = readFunctionTimestamp(keyspaceName, functionName, signature);
- UDFunction metadata = readFunctionMetadata(keyspaceName, functionName, signature);
- return new Function(timestamp, metadata);
- }
-
- private static long readFunctionTimestamp(String keyspaceName, String functionName, List<String> signature)
- {
- String query = format("SELECT writeTime(return_type) AS timestamp " +
- "FROM %s.%s " +
- "WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_FUNCTIONS);
- return query(query, keyspaceName, functionName, signature).one().getLong("timestamp");
- }
-
- private static UDFunction readFunctionMetadata(String keyspaceName, String functionName, List<String> signature)
- {
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_FUNCTIONS);
- UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
-
- FunctionName name = new FunctionName(keyspaceName, functionName);
-
- List<ColumnIdentifier> argNames = new ArrayList<>();
- if (row.has("argument_names"))
- for (String arg : row.getList("argument_names", UTF8Type.instance))
- argNames.add(new ColumnIdentifier(arg, true));
-
- List<AbstractType<?>> argTypes = new ArrayList<>();
- if (row.has("argument_types"))
- for (String type : row.getList("argument_types", UTF8Type.instance))
- argTypes.add(parseType(type));
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
-
- String language = row.getString("language");
- String body = row.getString("body");
- boolean calledOnNullInput = row.getBoolean("called_on_null_input");
-
- try
- {
- return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body);
- }
- catch (InvalidRequestException e)
- {
- return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e);
- }
- }
-
- /*
- * Reading UDAs
- */
-
- private static Collection<Aggregate> readAggregates(Functions functions, String keyspaceName)
- {
- String query = format("SELECT aggregate_name, signature FROM %s.%s WHERE keyspace_name = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_AGGREGATES);
- HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create();
- query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance)));
-
- Collection<Aggregate> aggregates = new ArrayList<>();
- aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(functions, keyspaceName, pair.getKey(), pair.getValue())));
- return aggregates;
- }
-
- private static Aggregate readAggregate(Functions functions, String keyspaceName, String aggregateName, List<String> signature)
- {
- long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, signature);
- UDAggregate metadata = readAggregateMetadata(functions, keyspaceName, aggregateName, signature);
- return new Aggregate(timestamp, metadata);
- }
-
- private static long readAggregateTimestamp(String keyspaceName, String aggregateName, List<String> signature)
- {
- String query = format("SELECT writeTime(return_type) AS timestamp " +
- "FROM %s.%s " +
- "WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_AGGREGATES);
- return query(query, keyspaceName, aggregateName, signature).one().getLong("timestamp");
- }
-
- private static UDAggregate readAggregateMetadata(Functions functions, String keyspaceName, String functionName, List<String> signature)
- {
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.LEGACY_AGGREGATES);
- UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
-
- FunctionName name = new FunctionName(keyspaceName, functionName);
-
- List<String> types = row.getList("argument_types", UTF8Type.instance);
-
- List<AbstractType<?>> argTypes = new ArrayList<>();
- if (types != null)
- {
- argTypes = new ArrayList<>(types.size());
- for (String type : types)
- argTypes.add(parseType(type));
- }
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
-
- FunctionName stateFunc = new FunctionName(keyspaceName, row.getString("state_func"));
- AbstractType<?> stateType = parseType(row.getString("state_type"));
- FunctionName finalFunc = row.has("final_func") ? new FunctionName(keyspaceName, row.getString("final_func")) : null;
- ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
-
- try
- {
- return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
- }
- catch (InvalidRequestException reason)
- {
- return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
- }
- }
-
- private static UntypedResultSet query(String query, Object... values)
- {
- return QueryProcessor.executeOnceInternal(query, values);
- }
-
- private static AbstractType<?> parseType(String str)
- {
- return TypeParser.parse(str);
- }
-
- private static final class Keyspace
- {
- final long timestamp;
- final String name;
- final KeyspaceParams params;
- final Collection<Table> tables;
- final Collection<Type> types;
- final Collection<Function> functions;
- final Collection<Aggregate> aggregates;
-
- Keyspace(long timestamp,
- String name,
- KeyspaceParams params,
- Collection<Table> tables,
- Collection<Type> types,
- Collection<Function> functions,
- Collection<Aggregate> aggregates)
- {
- this.timestamp = timestamp;
- this.name = name;
- this.params = params;
- this.tables = tables;
- this.types = types;
- this.functions = functions;
- this.aggregates = aggregates;
- }
- }
-
- private static final class Table
- {
- final long timestamp;
- final CFMetaData metadata;
-
- Table(long timestamp, CFMetaData metadata)
- {
- this.timestamp = timestamp;
- this.metadata = metadata;
- }
- }
-
- private static final class Type
- {
- final long timestamp;
- final UserType metadata;
-
- Type(long timestamp, UserType metadata)
- {
- this.timestamp = timestamp;
- this.metadata = metadata;
- }
- }
-
- private static final class Function
- {
- final long timestamp;
- final UDFunction metadata;
-
- Function(long timestamp, UDFunction metadata)
- {
- this.timestamp = timestamp;
- this.metadata = metadata;
- }
- }
-
- private static final class Aggregate
- {
- final long timestamp;
- final UDAggregate metadata;
-
- Aggregate(long timestamp, UDAggregate metadata)
- {
- this.timestamp = timestamp;
- this.metadata = metadata;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7aa926e..8944b7c 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -106,7 +106,7 @@ public abstract class AbstractReadExecutor
if (traceState != null)
traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
- MessageOut<ReadCommand> message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint));
+ MessageOut<ReadCommand> message = readCommand.createMessage();
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
@@ -291,8 +291,7 @@ public abstract class AbstractReadExecutor
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
- int version = MessagingService.instance().getVersion(extraReplica);
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version), extraReplica, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
speculated = true;
cfs.metric.speculativeRetries.inc();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index a4e18c0..54fa7e2 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -454,10 +454,6 @@ public class CacheService implements CacheServiceMBean
{
public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException
{
- //Don't serialize old format entries since we didn't bother to implement serialization of both for simplicity
- //https://issues.apache.org/jira/browse/CASSANDRA-10778
- if (!key.desc.version.storeRows()) return;
-
RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key);
if (entry == null)
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..b41cc00 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,7 +46,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -59,14 +58,12 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.hints.LegacyHintsMigrator;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.schema.LegacySchemaMigrator;
import org.apache.cassandra.thrift.ThriftServer;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
@@ -205,18 +202,6 @@ public class CassandraDaemon
exitOrFail(e.returnCode, e.getMessage(), e.getCause());
}
- try
- {
- if (SystemKeyspace.snapshotOnVersionChange())
- {
- SystemKeyspace.migrateDataDirs();
- }
- }
- catch (IOException e)
- {
- exitOrFail(3, e.getMessage(), e.getCause());
- }
-
// We need to persist this as soon as possible after startup checks.
// This should be the first write to SystemKeyspace (CASSANDRA-11742)
SystemKeyspace.persistLocalMetadata();
@@ -249,13 +234,6 @@ public class CassandraDaemon
}
});
- /*
- * Migrate pre-3.0 keyspaces, tables, types, functions, and aggregates, to their new 3.0 storage.
- * We don't (and can't) wait for commit log replay here, but we don't need to - all schema changes force
- * explicit memtable flushes.
- */
- LegacySchemaMigrator.migrate();
-
// Populate token metadata before flushing, for token-aware sstable partitioning (#6696)
StorageService.instance.populateTokenMetadata();
@@ -333,12 +311,6 @@ public class CassandraDaemon
// Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293)
StorageService.instance.populateTokenMetadata();
- // migrate any legacy (pre-3.0) hints from system.hints table into the new store
- new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
-
- // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
- LegacyBatchlogMigrator.migrate();
-
// enable auto compaction
for (Keyspace keyspace : Keyspace.all())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index be8eca1..48ad2c6 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -512,7 +512,7 @@ public class DataResolver extends ResponseResolver
if (StorageProxy.canDoLocalRequest(source))
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 11c0b12..6e0fadb 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -247,10 +247,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
for (InetAddress endpoint : endpoints)
- {
- MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
- MessagingService.instance().sendRR(message, endpoint, repairHandler);
- }
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index 83971dd..75f7788 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -259,14 +259,15 @@ public class StartupChecks
FileVisitor<Path> sstableVisitor = new SimpleFileVisitor<Path>()
{
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+ public FileVisitResult visitFile(Path path, BasicFileAttributes attrs)
{
- if (!Descriptor.isValidFile(file.getFileName().toString()))
+ File file = path.toFile();
+ if (!Descriptor.isValidFile(file))
return FileVisitResult.CONTINUE;
try
{
- if (!Descriptor.fromFilename(file.toString()).isCompatible())
+ if (!Descriptor.fromFilename(file).isCompatible())
invalid.add(file.toString());
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0be68c..77862d6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -909,10 +908,10 @@ public class StorageProxy implements StorageProxyMBean
batchConsistencyLevel = consistency_level;
}
- final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+ final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
final UUID batchUUID = UUIDGen.getTimeUUID();
BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
- () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime));
+ () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
@@ -969,33 +968,19 @@ public class StorageProxy implements StorageProxyMBean
return replica.equals(FBUtilities.getBroadcastAddress());
}
- private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime)
throws WriteTimeoutException, WriteFailureException
{
- WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+ WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints,
Collections.<InetAddress>emptyList(),
- endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+ endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME),
null,
WriteType.BATCH_LOG,
queryStartNanoTime);
Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
-
- if (!endpoints.current.isEmpty())
- syncWriteToBatchlog(handler, batch, endpoints.current);
-
- if (!endpoints.legacy.isEmpty())
- LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
-
- handler.get();
- }
-
- private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
- throws WriteTimeoutException, WriteFailureException
- {
MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-
for (InetAddress target : endpoints)
{
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
@@ -1005,15 +990,7 @@ public class StorageProxy implements StorageProxyMBean
else
MessagingService.instance().sendRR(message, target, handler);
}
- }
-
- private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
- {
- if (!endpoints.current.isEmpty())
- asyncRemoveFromBatchlog(endpoints.current, uuid);
-
- if (!endpoints.legacy.isEmpty())
- LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime);
+ handler.get();
}
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
@@ -1160,38 +1137,13 @@ public class StorageProxy implements StorageProxyMBean
}
/*
- * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not.
- */
- private static final class BatchlogEndpoints
- {
- public final Collection<InetAddress> all;
- public final Collection<InetAddress> current;
- public final Collection<InetAddress> legacy;
-
- BatchlogEndpoints(Collection<InetAddress> endpoints)
- {
- all = endpoints;
- current = new ArrayList<>(2);
- legacy = new ArrayList<>(2);
-
- for (InetAddress ep : endpoints)
- {
- if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30)
- current.add(ep);
- else
- legacy.add(ep);
- }
- }
- }
-
- /*
* Replicas are picked manually:
* - replicas should be alive according to the failure detector
* - replicas should be in the local datacenter
* - choose min(2, number of qualifying candiates above)
* - allow the local node to be the only replica only if it's a single-node DC
*/
- private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+ private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
throws UnavailableException
{
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1202,12 +1154,12 @@ public class StorageProxy implements StorageProxyMBean
if (chosenEndpoints.isEmpty())
{
if (consistencyLevel == ConsistencyLevel.ANY)
- return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress()));
+ return Collections.singleton(FBUtilities.getBroadcastAddress());
throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
}
- return new BatchlogEndpoints(chosenEndpoints);
+ return chosenEndpoints;
}
/**
@@ -1816,9 +1768,8 @@ public class StorageProxy implements StorageProxyMBean
for (InetAddress endpoint : executor.getContactedReplicas())
{
- MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
Tracing.trace("Enqueuing full data read to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
+ MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, repairHandler);
}
}
}
@@ -2218,9 +2169,8 @@ public class StorageProxy implements StorageProxyMBean
{
for (InetAddress endpoint : toQuery.filteredEndpoints)
{
- MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
Tracing.trace("Enqueuing request to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+ MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), endpoint, handler);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 07eb1d8..62efed2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -257,12 +257,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName);
+ ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler();
+
/* register the verb handlers */
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, readHandler);
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, readHandler);
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, readHandler);
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
@@ -2082,8 +2084,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public boolean isRpcReady(InetAddress endpoint)
{
- return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 ||
- Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
}
public void setRpcReady(boolean value)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index af94869..3b0364c 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -113,32 +113,20 @@ public class Commit
{
public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException
{
- if (version < MessagingService.VERSION_30)
- ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out);
-
UUIDSerializer.serializer.serialize(commit.ballot, out, version);
PartitionUpdate.serializer.serialize(commit.update, out, version);
}
public Commit deserialize(DataInputPlus in, int version) throws IOException
{
- ByteBuffer key = null;
- if (version < MessagingService.VERSION_30)
- key = ByteBufferUtil.readWithShortLength(in);
-
UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
- PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key);
+ PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL);
return new Commit(ballot, update);
}
public long serializedSize(Commit commit, int version)
{
- int size = 0;
- if (version < MessagingService.VERSION_30)
- size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey());
-
- return size
- + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
+ return UUIDSerializer.serializer.serializedSize(commit.ballot, version)
+ PartitionUpdate.serializer.serializedSize(commit.update, version);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index f843b8d..d8699c8 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -69,51 +69,22 @@ public class PrepareResponse
{
out.writeBoolean(response.promised);
Commit.serializer.serialize(response.inProgressCommit, out, version);
-
- if (version < MessagingService.VERSION_30)
- {
- UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version);
- PartitionUpdate.serializer.serialize(response.mostRecentCommit.update, out, version);
- }
- else
- {
- Commit.serializer.serialize(response.mostRecentCommit, out, version);
- }
+ Commit.serializer.serialize(response.mostRecentCommit, out, version);
}
public PrepareResponse deserialize(DataInputPlus in, int version) throws IOException
{
boolean success = in.readBoolean();
Commit inProgress = Commit.serializer.deserialize(in, version);
- Commit mostRecent;
- if (version < MessagingService.VERSION_30)
- {
- UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
- PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, inProgress.update.partitionKey());
- mostRecent = new Commit(ballot, update);
- }
- else
- {
- mostRecent = Commit.serializer.deserialize(in, version);
- }
+ Commit mostRecent = Commit.serializer.deserialize(in, version);
return new PrepareResponse(success, inProgress, mostRecent);
}
public long serializedSize(PrepareResponse response, int version)
{
- long size = TypeSizes.sizeof(response.promised)
- + Commit.serializer.serializedSize(response.inProgressCommit, version);
-
- if (version < MessagingService.VERSION_30)
- {
- size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version);
- size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version);
- }
- else
- {
- size += Commit.serializer.serializedSize(response.mostRecentCommit, version);
- }
- return size;
+ return TypeSizes.sizeof(response.promised)
+ + Commit.serializer.serializedSize(response.inProgressCommit, version)
+ + Commit.serializer.serializedSize(response.mostRecentCommit, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 6465bf7..fab9372 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -196,16 +196,7 @@ public class StreamReader
long totalSize, UUID sessionId) throws IOException
{
this.metadata = metadata;
- // streaming pre-3.0 sstables require mark/reset support from source stream
- if (version.correspondingMessagingVersion() < MessagingService.VERSION_30)
- {
- logger.trace("Initializing rewindable input stream for reading legacy sstable with {} bytes with following " +
- "parameters: initial_mem_buffer_size={}, max_mem_buffer_size={}, max_spill_file_size={}.",
- totalSize, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, MAX_SPILL_FILE_SIZE);
- File bufferFile = getTempBufferFile(metadata, totalSize, sessionId);
- this.in = new RewindableDataInputStreamPlus(in, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, bufferFile, MAX_SPILL_FILE_SIZE);
- } else
- this.in = new DataInputPlus.DataInputStreamPlus(in);
+ this.in = new DataInputPlus.DataInputStreamPlus(in);
this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
this.header = header;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 70b5765..2044d4d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -81,7 +82,7 @@ public class CompressedStreamReader extends StreamReader
cfs.getColumnFamilyName());
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
- inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
+ ChecksumType.CRC32, cfs::getCrcCheckChance);
TrackedInputStream in = new TrackedInputStream(cis);
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 232727d..b0639ea 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -189,13 +189,7 @@ public class FileMessageHeader
UUIDSerializer.serializer.serialize(header.cfId, out, version);
out.writeInt(header.sequenceNumber);
out.writeUTF(header.version.toString());
-
- //We can't stream to a node that doesn't understand a new sstable format
- if (version < StreamMessage.VERSION_22 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG)
- throw new UnsupportedOperationException("Can't stream non-legacy sstables to nodes < 2.2");
-
- if (version >= StreamMessage.VERSION_22)
- out.writeUTF(header.format.name);
+ out.writeUTF(header.format.name);
out.writeLong(header.estimatedKeys);
out.writeInt(header.sections.size());
@@ -212,8 +206,7 @@ public class FileMessageHeader
out.writeLong(header.repairedAt);
out.writeInt(header.sstableLevel);
- if (version >= StreamMessage.VERSION_30 && header.version.storeRows())
- SerializationHeader.serializer.serialize(header.version, header.header, out);
+ SerializationHeader.serializer.serialize(header.version, header.header, out);
return compressionInfo;
}
@@ -222,10 +215,7 @@ public class FileMessageHeader
UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
int sequenceNumber = in.readInt();
Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
-
- SSTableFormat.Type format = SSTableFormat.Type.LEGACY;
- if (version >= StreamMessage.VERSION_22)
- format = SSTableFormat.Type.validate(in.readUTF());
+ SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
long estimatedKeys = in.readLong();
int count = in.readInt();
@@ -235,9 +225,7 @@ public class FileMessageHeader
CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
long repairedAt = in.readLong();
int sstableLevel = in.readInt();
- SerializationHeader.Component header = version >= StreamMessage.VERSION_30 && sstableVersion.storeRows()
- ? SerializationHeader.serializer.deserialize(sstableVersion, in)
- : null;
+ SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in);
return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
}
@@ -247,10 +235,7 @@ public class FileMessageHeader
long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
size += TypeSizes.sizeof(header.sequenceNumber);
size += TypeSizes.sizeof(header.version.toString());
-
- if (version >= StreamMessage.VERSION_22)
- size += TypeSizes.sizeof(header.format.name);
-
+ size += TypeSizes.sizeof(header.format.name);
size += TypeSizes.sizeof(header.estimatedKeys);
size += TypeSizes.sizeof(header.sections.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 7487aaf..3ce1958 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,8 +33,6 @@ import org.apache.cassandra.streaming.StreamSession;
public abstract class StreamMessage
{
/** Streaming protocol version */
- public static final int VERSION_20 = 2;
- public static final int VERSION_22 = 3;
public static final int VERSION_30 = 4;
public static final int CURRENT_VERSION = VERSION_30;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 070434d..52d5ecf 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -93,8 +93,8 @@ public class SSTableExport
*/
public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException
{
- if (!desc.version.storeRows())
- throw new IOException("pre-3.0 SSTable is not supported.");
+ if (!desc.version.isCompatible())
+ throw new IOException("Cannot process old and unsupported SSTable version.");
EnumSet<MetadataType> types = EnumSet.of(MetadataType.STATS, MetadataType.HEADER);
Map<MetadataType, MetadataComponent> sstableMetadata = desc.getMetadataSerializer().deserialize(desc, types);
@@ -162,11 +162,6 @@ public class SSTableExport
: cmd.getOptionValues(EXCLUDE_KEY_OPTION)));
String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath();
- if (Descriptor.isLegacyFile(new File(ssTableFileName)))
- {
- System.err.println("Unsupported legacy sstable");
- System.exit(1);
- }
if (!new File(ssTableFileName).exists())
{
System.err.println("Cannot find file " + ssTableFileName);