You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ed...@apache.org on 2021/03/11 02:50:40 UTC

[cassandra] branch trunk updated (0e990d7 -> c7432e9)

This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 0e990d7  Merge branch 'cassandra-3.11' into trunk
     add efde6a7  Refuse DROP COMPACT STORAGE if some 2.x sstables are in use patch by Sylvain Lebresne; review changes and tests by Ekaterina Dimitrova; reviewed by Benjamin Lerer and Brandon Williams for CASSANDRA-15897
     add b654000  Merge branch 'cassandra-3.0' into cassandra-3.11
     new c7432e9  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 .../statements/schema/AlterTableStatement.java     |  93 ++++++-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   4 +
 .../db/compaction/CompactionStrategyManager.java   |   5 +-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  61 +++--
 .../org/apache/cassandra/gms/ApplicationState.java |  17 ++
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  42 ++-
 .../org/apache/cassandra/gms/VersionedValue.java   |  12 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |   7 +
 .../cassandra/io/sstable/format/Version.java       |   1 -
 .../io/sstable/format/VersionAndType.java          |  93 +++++++
 ...n.java => InitialSSTableAddedNotification.java} |  13 +-
 src/java/org/apache/cassandra/schema/Schema.java   |   3 +-
 .../cassandra/schema/SchemaTransformation.java     |   4 +-
 .../cassandra/service/SSTablesGlobalTracker.java   | 284 +++++++++++++++++++++
 .../SSTablesVersionsInUseChangeNotification.java   |  50 ++++
 .../apache/cassandra/service/StartupChecks.java    |   5 +-
 .../apache/cassandra/service/StorageService.java   |  19 ++
 .../upgrade/CompactStorage3to4UpgradeTest.java     |   4 +
 .../validation/operations/CompactTableTest.java    |   1 -
 .../apache/cassandra/db/lifecycle/TrackerTest.java |  31 ++-
 .../io/sstable/format/VersionAndTypeTest.java      |  51 ++++
 .../service/SSTablesGlobalTrackerTest.java         | 158 ++++++++++++
 25 files changed, 891 insertions(+), 72 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/io/sstable/format/VersionAndType.java
 copy src/java/org/apache/cassandra/notifications/{SSTableDeletingNotification.java => InitialSSTableAddedNotification.java} (65%)
 create mode 100644 src/java/org/apache/cassandra/service/SSTablesGlobalTracker.java
 create mode 100644 src/java/org/apache/cassandra/service/SSTablesVersionsInUseChangeNotification.java
 create mode 100644 test/unit/org/apache/cassandra/io/sstable/format/VersionAndTypeTest.java
 create mode 100644 test/unit/org/apache/cassandra/service/SSTablesGlobalTrackerTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ed...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c7432e98a381c16cb63692e727c35e2909d7200a
Merge: 0e990d7 b654000
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Wed Mar 10 20:54:07 2021 -0500

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 .../statements/schema/AlterTableStatement.java     |  93 ++++++-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   4 +
 .../db/compaction/CompactionStrategyManager.java   |   5 +-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  61 +++--
 .../org/apache/cassandra/gms/ApplicationState.java |  17 ++
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |   2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  42 ++-
 .../org/apache/cassandra/gms/VersionedValue.java   |  12 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |   7 +
 .../cassandra/io/sstable/format/Version.java       |   1 -
 .../io/sstable/format/VersionAndType.java          |  93 +++++++
 .../InitialSSTableAddedNotification.java}          |  27 +-
 src/java/org/apache/cassandra/schema/Schema.java   |   3 +-
 .../cassandra/schema/SchemaTransformation.java     |   4 +-
 .../cassandra/service/SSTablesGlobalTracker.java   | 284 +++++++++++++++++++++
 .../SSTablesVersionsInUseChangeNotification.java   |  50 ++++
 .../apache/cassandra/service/StartupChecks.java    |   5 +-
 .../apache/cassandra/service/StorageService.java   |  19 ++
 .../upgrade/CompactStorage3to4UpgradeTest.java     |   4 +
 .../validation/operations/CompactTableTest.java    |   1 -
 .../apache/cassandra/db/lifecycle/TrackerTest.java |  31 ++-
 .../io/sstable/format/VersionAndTypeTest.java      |  51 ++++
 .../service/SSTablesGlobalTrackerTest.java         | 158 ++++++++++++
 25 files changed, 899 insertions(+), 78 deletions(-)

diff --cc CHANGES.txt
index ffec3cb,1e74bf6..592c9c3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,10 +1,51 @@@
 -3.11.11
 +4.0-beta5
 + * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during startup (CASSANDRA-16425)
 + * Reinstate removed ApplicationState padding (CASSANDRA-16484)
 + * Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335)
 + * Add possibility to copy SSTables in SSTableImporter instead of moving them (CASSANDRA-16407)
 + * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482)
 + * Fix cassandra-stress JMX connection (CASSANDRA-16473)
 + * Avoid processing redundant application states on endpoint changes (CASSANDRA-16381)
 + * Prevent parent repair sessions leak (CASSANDRA-16446)
 + * Fix timestamp issue in SinglePartitionSliceCommandTest testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
 + * Promote protocol V5 out of beta (CASSANDRA-14973)
 + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
 + * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296)
 + * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344) 
 + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422)
 + * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083)
 + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439)
 + * Improve system tables handling in case of disk failures (CASSANDRA-14793)
 + * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
 + * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283)
 + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
 + * Move excessive repair debug loggings to trace level (CASSANDRA-16406)
 + * Restore validation of each message's protocol version (CASSANDRA-16374)
 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
 + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834)
 + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303)
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 +Merged from 3.11:
   * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 + * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962)
 + * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
 + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
  Merged from 3.0:
+  * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use (CASSANDRA-15897)
   * Fix ColumnFilter::toString not returning a valid CQL fragment (CASSANDRA-16483)
   * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades (CASSANDRA-16415)
 + * Update debian packaging for python3 (CASSANDRA-16396)
   * Avoid pushing schema mutations when setting up distributed system keyspaces locally (CASSANDRA-16387)
 + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
 + * Improve empty hint file handling during startup (CASSANDRA-16162)
 + * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
 + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
  Merged from 2.2:
   * Fix centos packaging for arm64, >=4.0 rpm's now require python3 (CASSANDRA-16477)
   * Make TokenMetadata's ring version increments atomic (CASSANDRA-16286)
diff --cc build.xml
index 36db9e5,5726fed..ecb1ad8
--- a/build.xml
+++ b/build.xml
@@@ -586,14 -412,21 +586,15 @@@
            <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/>
 -
 -          <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
 -          <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2">
 -	         <exclusion groupId="commons-logging" artifactId="commons-logging"/>
 -          </dependency>
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/>
 +          <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.26"/>
            <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.26" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
            <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.7" />
            <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" />
+           <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
@@@ -667,22 -484,11 +668,21 @@@
            <dependency groupId="de.jflex" artifactId="jflex" version="1.6.0" />
            <dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" />
            <dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" />
 -          <dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" />
 -          <dependency groupId="org.jctools" artifactId="jctools-core" version="1.2.1"/>
 -          <dependency groupId="org.ow2.asm" artifactId="asm" version="5.0.4" />
 +          <dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.3.5" />
 +          <dependency groupId="org.jctools" artifactId="jctools-core" version="3.1.0"/>
 +          <dependency groupId="org.ow2.asm" artifactId="asm" version="${asm.version}" />
 +          <dependency groupId="org.ow2.asm" artifactId="asm-tree" version="${asm.version}" />
 +          <dependency groupId="org.ow2.asm" artifactId="asm-commons" version="${asm.version}" />
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-cli" version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-core" version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-stacktrace" version="0.14"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="mxdump" version="0.14"/>
 +          <dependency groupId="org.gridkit.lab" artifactId="jvm-attach-api" version="1.5"/>
 +          <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-json" version="0.14"/>
 +          <dependency groupId="com.beust" artifactId="jcommander" version="1.30"/>
            <!-- when updating assertj, make sure to also update the corresponding junit-bom dependency -->
            <dependency groupId="org.assertj" artifactId="assertj-core" version="3.15.0"/>
 +          <dependency groupId="org.awaitility" artifactId="awaitility" version="4.0.3" />
- 
          </dependencyManagement>
          <developer id="adelapena" name="Andres de la Peña"/>
          <developer id="alakshman" name="Avinash Lakshman"/>
diff --cc src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index b6cab44,0000000..0820073
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@@ -1,506 -1,0 +1,593 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements.schema;
 +
++import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
++import java.util.concurrent.TimeUnit;
 +
++import com.google.common.base.Splitter;
 +import com.google.common.collect.ImmutableSet;
 +
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
 +import org.apache.cassandra.audit.AuditLogContext;
 +import org.apache.cassandra.audit.AuditLogEntryType;
 +import org.apache.cassandra.auth.Permission;
 +
 +import org.apache.cassandra.cql3.CQL3Type;
 +import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QualifiedName;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +
++import org.apache.cassandra.exceptions.InvalidRequestException;
++import org.apache.cassandra.gms.ApplicationState;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.Keyspaces;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableParams;
 +import org.apache.cassandra.schema.ViewMetadata;
 +import org.apache.cassandra.schema.Views;
 +import org.apache.cassandra.service.ClientState;
++import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 +import org.apache.cassandra.transport.Event.SchemaChange;
 +import org.apache.cassandra.transport.Event.SchemaChange.Change;
 +import org.apache.cassandra.transport.Event.SchemaChange.Target;
++import org.apache.cassandra.utils.NoSpamLogger;
 +
++import static java.lang.String.format;
 +import static java.lang.String.join;
 +
 +import static com.google.common.collect.Iterables.isEmpty;
 +import static com.google.common.collect.Iterables.transform;
 +
 +public abstract class AlterTableStatement extends AlterSchemaStatement
 +{
 +    protected final String tableName;
 +
 +    public AlterTableStatement(String keyspaceName, String tableName)
 +    {
 +        super(keyspaceName);
 +        this.tableName = tableName;
 +    }
 +
-     public Keyspaces apply(Keyspaces schema)
++    public Keyspaces apply(Keyspaces schema) throws UnknownHostException
 +    {
 +        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
 +
 +        TableMetadata table = null == keyspace
 +                            ? null
 +                            : keyspace.getTableOrViewNullable(tableName);
 +
 +        if (null == table)
 +            throw ire("Table '%s.%s' doesn't exist", keyspaceName, tableName);
 +
 +        if (table.isView())
 +            throw ire("Cannot use ALTER TABLE on a materialized view; use ALTER MATERIALIZED VIEW instead");
 +
 +        return schema.withAddedOrUpdated(apply(keyspace, table));
 +    }
 +
 +    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
 +    {
 +        return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, tableName);
 +    }
 +
 +    public void authorize(ClientState client)
 +    {
 +        client.ensureTablePermission(keyspaceName, tableName, Permission.ALTER);
 +    }
 +
 +    @Override
 +    public AuditLogContext getAuditLogContext()
 +    {
 +        return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, keyspaceName, tableName);
 +    }
 +
 +    public String toString()
 +    {
-         return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName);
++        return format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName);
 +    }
 +
-     abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table);
++    abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) throws UnknownHostException;
 +
 +    /**
 +     * ALTER TABLE <table> ALTER <column> TYPE <newtype>;
 +     *
 +     * No longer supported.
 +     */
 +    public static class AlterColumn extends AlterTableStatement
 +    {
 +        AlterColumn(String keyspaceName, String tableName)
 +        {
 +            super(keyspaceName, tableName);
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            throw ire("Altering column types is no longer supported");
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> ADD <column> <newtype>
 +     * ALTER TABLE <table> ADD (<column> <newtype>, <column1> <newtype1>, ... <columnn> <newtypen>)
 +     */
 +    private static class AddColumns extends AlterTableStatement
 +    {
 +        private static class Column
 +        {
 +            private final ColumnIdentifier name;
 +            private final CQL3Type.Raw type;
 +            private final boolean isStatic;
 +
 +            Column(ColumnIdentifier name, CQL3Type.Raw type, boolean isStatic)
 +            {
 +                this.name = name;
 +                this.type = type;
 +                this.isStatic = isStatic;
 +            }
 +        }
 +
 +        private final Collection<Column> newColumns;
 +
 +        private AddColumns(String keyspaceName, String tableName, Collection<Column> newColumns)
 +        {
 +            super(keyspaceName, tableName);
 +            this.newColumns = newColumns;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            TableMetadata.Builder tableBuilder = table.unbuild();
 +            Views.Builder viewsBuilder = keyspace.views.unbuild();
 +            newColumns.forEach(c -> addColumn(keyspace, table, c, tableBuilder, viewsBuilder));
 +
 +            return keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
 +                           .withSwapped(viewsBuilder.build());
 +        }
 +
 +        private void addColumn(KeyspaceMetadata keyspace,
 +                               TableMetadata table,
 +                               Column column,
 +                               TableMetadata.Builder tableBuilder,
 +                               Views.Builder viewsBuilder)
 +        {
 +            ColumnIdentifier name = column.name;
 +            AbstractType<?> type = column.type.prepare(keyspaceName, keyspace.types).getType();
 +            boolean isStatic = column.isStatic;
 +
 +            if (null != tableBuilder.getColumn(name))
 +                throw ire("Column with name '%s' already exists", name);
 +
 +            if (isStatic && table.clusteringColumns().isEmpty())
 +                throw ire("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
 +
 +            ColumnMetadata droppedColumn = table.getDroppedColumn(name.bytes);
 +            if (null != droppedColumn)
 +            {
 +                // After #8099, not safe to re-add columns of incompatible types - until *maybe* deser logic with dropped
 +                // columns is pushed deeper down the line. The latter would still be problematic in cases of schema races.
 +                if (!type.isValueCompatibleWith(droppedColumn.type))
 +                {
 +                    throw ire("Cannot re-add previously dropped column '%s' of type %s, incompatible with previous type %s",
 +                              name,
 +                              type.asCQL3Type(),
 +                              droppedColumn.type.asCQL3Type());
 +                }
 +
 +                if (droppedColumn.isStatic() != isStatic)
 +                {
 +                    throw ire("Cannot re-add previously dropped column '%s' of kind %s, incompatible with previous kind %s",
 +                              name,
 +                              isStatic ? ColumnMetadata.Kind.STATIC : ColumnMetadata.Kind.REGULAR,
 +                              droppedColumn.kind);
 +                }
 +
 +                // Cannot re-add a dropped counter column. See #7831.
 +                if (table.isCounter())
 +                    throw ire("Cannot re-add previously dropped counter column %s", name);
 +            }
 +
 +            if (isStatic)
 +                tableBuilder.addStaticColumn(name, type);
 +            else
 +                tableBuilder.addRegularColumn(name, type);
 +
 +            if (!isStatic)
 +            {
 +                for (ViewMetadata view : keyspace.views.forTable(table.id))
 +                {
 +                    if (view.includeAllColumns)
 +                    {
 +                        ColumnMetadata viewColumn = ColumnMetadata.regularColumn(view.metadata, name.bytes, type);
 +                        viewsBuilder.put(viewsBuilder.get(view.name()).withAddedRegularColumn(viewColumn));
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> DROP <column>
 +     * ALTER TABLE <table> DROP ( <column>, <column1>, ... <columnn>)
 +     */
 +    // TODO: swap UDT refs with expanded tuples on drop
 +    private static class DropColumns extends AlterTableStatement
 +    {
 +        private final Set<ColumnIdentifier> removedColumns;
 +        private final Long timestamp;
 +
 +        private DropColumns(String keyspaceName, String tableName, Set<ColumnIdentifier> removedColumns, Long timestamp)
 +        {
 +            super(keyspaceName, tableName);
 +            this.removedColumns = removedColumns;
 +            this.timestamp = timestamp;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            TableMetadata.Builder builder = table.unbuild();
 +            removedColumns.forEach(c -> dropColumn(keyspace, table, c, builder));
 +            return keyspace.withSwapped(keyspace.tables.withSwapped(builder.build()));
 +        }
 +
 +        private void dropColumn(KeyspaceMetadata keyspace, TableMetadata table, ColumnIdentifier column, TableMetadata.Builder builder)
 +        {
 +            ColumnMetadata currentColumn = table.getColumn(column);
 +            if (null == currentColumn)
 +                throw ire("Column %s was not found in table '%s'", column, table);
 +
 +            if (currentColumn.isPrimaryKeyColumn())
 +                throw ire("Cannot drop PRIMARY KEY column %s", column);
 +
 +            /*
 +             * Cannot allow dropping top-level columns of user defined types that aren't frozen because we cannot convert
 +             * the type into an equivalent tuple: we only support frozen tuples currently. And as such we cannot persist
 +             * the correct type in system_schema.dropped_columns.
 +             */
 +            if (currentColumn.type.isUDT() && currentColumn.type.isMultiCell())
 +                throw ire("Cannot drop non-frozen column %s of user type %s", column, currentColumn.type.asCQL3Type());
 +
 +            // TODO: some day try and find a way to not rely on Keyspace/IndexManager/Index to find dependent indexes
 +            Set<IndexMetadata> dependentIndexes = Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(currentColumn);
 +            if (!dependentIndexes.isEmpty())
 +            {
 +                throw ire("Cannot drop column %s because it has dependent secondary indexes (%s)",
 +                          currentColumn,
 +                          join(", ", transform(dependentIndexes, i -> i.name)));
 +            }
 +
 +            if (!isEmpty(keyspace.views.forTable(table.id)))
 +                throw ire("Cannot drop column %s on base table %s with materialized views", currentColumn, table.name);
 +
 +            builder.removeRegularOrStaticColumn(column);
 +            builder.recordColumnDrop(currentColumn, getTimestamp());
 +        }
 +
 +        /**
 +         * @return timestamp from query, otherwise return current time in micros
 +         */
 +        private long getTimestamp()
 +        {
 +            return timestamp == null ? ClientState.getTimestamp() : timestamp;
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> RENAME <column> TO <column>;
 +     */
 +    private static class RenameColumns extends AlterTableStatement
 +    {
 +        private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns;
 +
 +        private RenameColumns(String keyspaceName, String tableName, Map<ColumnIdentifier, ColumnIdentifier> renamedColumns)
 +        {
 +            super(keyspaceName, tableName);
 +            this.renamedColumns = renamedColumns;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            TableMetadata.Builder tableBuilder = table.unbuild();
 +            Views.Builder viewsBuilder = keyspace.views.unbuild();
 +            renamedColumns.forEach((o, n) -> renameColumn(keyspace, table, o, n, tableBuilder, viewsBuilder));
 +
 +            return keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build()))
 +                           .withSwapped(viewsBuilder.build());
 +        }
 +
 +        private void renameColumn(KeyspaceMetadata keyspace,
 +                                  TableMetadata table,
 +                                  ColumnIdentifier oldName,
 +                                  ColumnIdentifier newName,
 +                                  TableMetadata.Builder tableBuilder,
 +                                  Views.Builder viewsBuilder)
 +        {
 +            ColumnMetadata column = table.getExistingColumn(oldName);
 +            if (null == column)
 +                throw ire("Column %s was not found in table %s", oldName, table);
 +
 +            if (!column.isPrimaryKeyColumn())
 +                throw ire("Cannot rename non PRIMARY KEY column %s", oldName);
 +
 +            if (null != table.getColumn(newName))
 +            {
 +                throw ire("Cannot rename column %s to %s in table '%s'; another column with that name already exists",
 +                          oldName,
 +                          newName,
 +                          table);
 +            }
 +
 +            // TODO: some day try and find a way to not rely on Keyspace/IndexManager/Index to find dependent indexes
 +            Set<IndexMetadata> dependentIndexes = Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(column);
 +            if (!dependentIndexes.isEmpty())
 +            {
 +                throw ire("Can't rename column %s because it has dependent secondary indexes (%s)",
 +                          oldName,
 +                          join(", ", transform(dependentIndexes, i -> i.name)));
 +            }
 +
 +            for (ViewMetadata view : keyspace.views.forTable(table.id))
 +            {
 +                if (view.includes(oldName))
 +                {
 +                    viewsBuilder.put(viewsBuilder.get(view.name()).withRenamedPrimaryKeyColumn(oldName, newName));
 +                }
 +            }
 +
 +            tableBuilder.renamePrimaryKeyColumn(oldName, newName);
 +        }
 +    }
 +
 +    /**
 +     * ALTER TABLE <table> WITH <property> = <value>
 +     */
 +    private static class AlterOptions extends AlterTableStatement
 +    {
 +        private final TableAttributes attrs;
 +
 +        private AlterOptions(String keyspaceName, String tableName, TableAttributes attrs)
 +        {
 +            super(keyspaceName, tableName);
 +            this.attrs = attrs;
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            attrs.validate();
 +
 +            TableParams params = attrs.asAlteredTableParams(table.params);
 +
 +            if (table.isCounter() && params.defaultTimeToLive > 0)
 +                throw ire("Cannot set default_time_to_live on a table with counters");
 +
 +            if (!isEmpty(keyspace.views.forTable(table.id)) && params.gcGraceSeconds == 0)
 +            {
 +                throw ire("Cannot alter gc_grace_seconds of the base table of a " +
 +                          "materialized view to 0, since this value is used to TTL " +
 +                          "undelivered updates. Setting gc_grace_seconds too low might " +
 +                          "cause undelivered updates to expire " +
 +                          "before being replayed.");
 +            }
 +
 +            if (keyspace.createReplicationStrategy().hasTransientReplicas()
 +                && params.readRepair != ReadRepairStrategy.NONE)
 +            {
 +                throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
 +            }
 +
 +            return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
 +        }
 +    }
 +
 +
 +    /**
 +     * ALTER TABLE <table> DROP COMPACT STORAGE
 +     */
 +    private static class DropCompactStorage extends AlterTableStatement
 +    {
++        private static final Logger logger = LoggerFactory.getLogger(AlterTableStatement.class);
++        private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES);
 +        private DropCompactStorage(String keyspaceName, String tableName)
 +        {
 +            super(keyspaceName, tableName);
 +        }
 +
 +        public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table)
 +        {
 +            if (!table.isCompactTable())
 +                throw AlterTableStatement.ire("Cannot DROP COMPACT STORAGE on table without COMPACT STORAGE");
 +
++            validateCanDropCompactStorage();
++
 +            return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(ImmutableSet.of(TableMetadata.Flag.COMPOUND))));
 +        }
++
++        /**
++         * Throws if DROP COMPACT STORAGE cannot be used (yet) because the cluster is not sufficiently upgraded. To be able
++         * to use DROP COMPACT STORAGE, we need to ensure that no pre-3.0 sstables exists in the cluster, as we won't be
++         * able to read them anymore once COMPACT STORAGE is dropped (see CASSANDRA-15897). In practice, this method checks
++         * 3 things:
++         *   1) that all nodes are on 3.0+. We need this because 2.x nodes don't advertise their sstable versions.
++         *   2) for 3.0+, we use the new (CASSANDRA-15897) sstables versions set gossiped by all nodes to ensure all
++         *      sstables have been upgraded cluster-wise.
++         *   3) if the cluster still has some 3.0 nodes that predate CASSANDRA-15897, we will not have the sstable versions
++         *      for them. In that case, we also refuse DROP COMPACT (even though it may well be safe at this point) and ask
++         *      the user to upgrade all nodes.
++         */
++        private void validateCanDropCompactStorage()
++        {
++            Set<InetAddressAndPort> before4 = new HashSet<>();
++            Set<InetAddressAndPort> preC15897nodes = new HashSet<>();
++            Set<InetAddressAndPort> with2xSStables = new HashSet<>();
++            Splitter onComma = Splitter.on(',').omitEmptyStrings().trimResults();
++            for (InetAddressAndPort node : StorageService.instance.getTokenMetadata().getAllEndpoints())
++            {
++                if (MessagingService.instance().versions.knows(node) &&
++                    MessagingService.instance().versions.getRaw(node) < MessagingService.VERSION_40)
++                {
++                    before4.add(node);
++                    continue;
++                }
++
++                String sstableVersionsString = Gossiper.instance.getApplicationState(node, ApplicationState.SSTABLE_VERSIONS);
++                if (sstableVersionsString == null)
++                {
++                    preC15897nodes.add(node);
++                    continue;
++                }
++
++                try
++                {
++                    boolean has2xSStables = onComma.splitToList(sstableVersionsString)
++                                                   .stream()
++                                                   .anyMatch(v -> v.compareTo("big-ma")<=0);
++                    if (has2xSStables)
++                        with2xSStables.add(node);
++                }
++                catch (IllegalArgumentException e)
++                {
++                    // Means VersionType::fromString didn't parse a version correctly. Which shouldn't happen, we shouldn't
++                    // have garbage in Gossip. But crashing the request is not ideal, so we log the error but ignore the
++                    // node otherwise.
++                    noSpamLogger.error("Unexpected error parsing sstable versions from gossip for {} (gossiped value " +
++                                       "is '{}'). This is a bug and should be reported. Cannot ensure that {} has no " +
++                                       "non-upgraded 2.x sstables anymore. If after this DROP COMPACT STORAGE some old " +
++                                       "sstables cannot be read anymore, please use `upgradesstables` with the " +
++                                       "`--force-compact-storage-on` option.", node, sstableVersionsString, node);
++                }
++            }
++
++            if (!before4.isEmpty())
++                throw new InvalidRequestException(format("Cannot DROP COMPACT STORAGE as some nodes in the cluster (%s) " +
++                                                         "are not on 4.0+ yet. Please upgrade those nodes and run " +
++                                                         "`upgradesstables` before retrying.", before4));
++            if (!preC15897nodes.isEmpty())
++                throw new InvalidRequestException(format("Cannot guarantee that DROP COMPACT STORAGE is safe as some nodes " +
++                                                         "in the cluster (%s) do not have https://issues.apache.org/jira/browse/CASSANDRA-15897. " +
++                                                         "Please upgrade those nodes and retry.", preC15897nodes));
++            if (!with2xSStables.isEmpty())
++                throw new InvalidRequestException(format("Cannot DROP COMPACT STORAGE as some nodes in the cluster (%s) " +
++                                                         "has some non-upgraded 2.x sstables. Please run `upgradesstables` " +
++                                                         "on those nodes before retrying", with2xSStables));
++        }
 +    }
 +
 +    public static final class Raw extends CQLStatement.Raw
 +    {
 +        private enum Kind
 +        {
 +            ALTER_COLUMN, ADD_COLUMNS, DROP_COLUMNS, RENAME_COLUMNS, ALTER_OPTIONS, DROP_COMPACT_STORAGE
 +        }
 +
 +        private final QualifiedName name;
 +
 +        private Kind kind;
 +
 +        // ADD
 +        private final List<AddColumns.Column> addedColumns = new ArrayList<>();
 +
 +        // DROP
 +        private final Set<ColumnIdentifier> droppedColumns = new HashSet<>();
 +        private Long timestamp = null; // will use execution timestamp if not provided by query
 +
 +        // RENAME
 +        private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns = new HashMap<>();
 +
 +        // OPTIONS
 +        public final TableAttributes attrs = new TableAttributes();
 +
 +        public Raw(QualifiedName name)
 +        {
 +            this.name = name;
 +        }
 +
 +        public AlterTableStatement prepare(ClientState state)
 +        {
 +            String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : state.getKeyspace();
 +            String tableName = name.getName();
 +
 +            switch (kind)
 +            {
 +                case          ALTER_COLUMN: return new AlterColumn(keyspaceName, tableName);
 +                case           ADD_COLUMNS: return new AddColumns(keyspaceName, tableName, addedColumns);
 +                case          DROP_COLUMNS: return new DropColumns(keyspaceName, tableName, droppedColumns, timestamp);
 +                case        RENAME_COLUMNS: return new RenameColumns(keyspaceName, tableName, renamedColumns);
 +                case         ALTER_OPTIONS: return new AlterOptions(keyspaceName, tableName, attrs);
 +                case  DROP_COMPACT_STORAGE: return new DropCompactStorage(keyspaceName, tableName);
 +            }
 +
 +            throw new AssertionError();
 +        }
 +
 +        public void alter(ColumnIdentifier name, CQL3Type.Raw type)
 +        {
 +            kind = Kind.ALTER_COLUMN;
 +        }
 +
 +        public void add(ColumnIdentifier name, CQL3Type.Raw type, boolean isStatic)
 +        {
 +            kind = Kind.ADD_COLUMNS;
 +            addedColumns.add(new AddColumns.Column(name, type, isStatic));
 +        }
 +
 +        public void drop(ColumnIdentifier name)
 +        {
 +            kind = Kind.DROP_COLUMNS;
 +            droppedColumns.add(name);
 +        }
 +
 +        public void dropCompactStorage()
 +        {
 +            kind = Kind.DROP_COMPACT_STORAGE;
 +        }
 +
 +        public void timestamp(long timestamp)
 +        {
 +            this.timestamp = timestamp;
 +        }
 +
 +        public void rename(ColumnIdentifier from, ColumnIdentifier to)
 +        {
 +            kind = Kind.RENAME_COLUMNS;
 +            renamedColumns.put(from, to);
 +        }
 +
 +        public void attrs()
 +        {
 +            this.kind = Kind.ALTER_OPTIONS;
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f3c9a66,7169245..deece30
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -116,21 -88,17 +116,21 @@@ public class CompactionStrategyManager 
      /**
       * Variables guarded by read and write lock above
       */
 -    //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
 -    private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
 -    private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
 +    private final PendingRepairHolder transientRepairs;
 +    private final PendingRepairHolder pendingRepairs;
 +    private final CompactionStrategyHolder repaired;
 +    private final CompactionStrategyHolder unrepaired;
 +
 +    private final ImmutableList<AbstractStrategyHolder> holders;
 +
      private volatile CompactionParams params;
      private DiskBoundaries currentBoundaries;
--    private volatile boolean enabled = true;
++    private volatile boolean enabled;
      private volatile boolean isActive = true;
  
 -    /**
 +    /*
          We keep a copy of the schema compaction parameters here to be able to decide if we
 -        should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
 +        should update the compaction strategy in maybeReload() due to an ALTER.
  
          If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
          we will use the new compaction parameters.
@@@ -744,10 -580,10 +744,11 @@@
          readLock.lock();
          try
          {
 +
              if (notification instanceof SSTableAddedNotification)
              {
-                 handleFlushNotification(((SSTableAddedNotification) notification).added);
+                 SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+                 handleFlushNotification(flushedNotification.added);
              }
              else if (notification instanceof SSTableListChangedNotification)
              {
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 9418724,5d5714a..3d72a11
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -107,7 -106,7 +107,7 @@@ public class Tracke
  
      Pair<View, View> apply(Function<View, View> function)
      {
--        return apply(Predicates.<View>alwaysTrue(), function);
++        return apply(Predicates.alwaysTrue(), function);
      }
  
      Throwable apply(Function<View, View> function, Throwable accumulate)
@@@ -236,7 -243,7 +244,7 @@@
  
      public Throwable dropSSTables(Throwable accumulate)
      {
--        return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
++        return dropSSTables(Predicates.alwaysTrue(), OperationType.UNKNOWN, accumulate);
      }
  
      /**
@@@ -267,7 -274,7 +275,7 @@@
                      accumulate = updateSizeTracking(removed, emptySet(), accumulate);
                      accumulate = release(selfRefs(removed), accumulate);
                      // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
--                    accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate);
++                    accumulate = notifySSTablesChanged(removed, Collections.emptySet(), txnLogs.type(), accumulate);
                  }
              }
              catch (Throwable t)
@@@ -291,13 -298,13 +299,7 @@@
       */
      public void removeUnreadableSSTables(final File directory)
      {
--        maybeFail(dropSSTables(new Predicate<SSTableReader>()
--        {
--            public boolean apply(SSTableReader reader)
--            {
--                return reader.descriptor.directory.equals(directory);
--            }
--        }, OperationType.UNKNOWN, null));
++        maybeFail(dropSSTables(reader -> reader.descriptor.directory.equals(directory), OperationType.UNKNOWN, null));
      }
  
  
@@@ -371,7 -378,7 +373,7 @@@
          notifyDiscarded(memtable);
  
          // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-         fail = notifyAdded(sstables, memtable, fail);
 -        fail = notifyAdded(sstables, false, fail);
++        fail = notifyAdded(sstables, false, memtable, fail);
  
          if (!isDummy() && !cfstore.isValid())
              dropSSTables();
@@@ -429,9 -436,14 +431,14 @@@
          return accumulate;
      }
  
-     Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, Throwable accumulate)
 -    Throwable notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables, Throwable accumulate)
++    Throwable notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables, Memtable memtable, Throwable accumulate)
      {
-         INotification notification = new SSTableAddedNotification(added, memtable);
+         INotification notification;
+         if (!isInitialSSTables)
 -            notification = new SSTableAddedNotification(added);
++            notification = new SSTableAddedNotification(added, memtable);
+         else
+             notification = new InitialSSTableAddedNotification(added);
+ 
          for (INotificationConsumer subscriber : subscribers)
          {
              try
@@@ -446,9 -458,9 +453,9 @@@
          return accumulate;
      }
  
-     public void notifyAdded(Iterable<SSTableReader> added)
+     void notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables)
      {
-         maybeFail(notifyAdded(added, null, null));
 -        maybeFail(notifyAdded(added, isInitialSSTables, null));
++        maybeFail(notifyAdded(added, isInitialSSTables, null, null));
      }
  
      public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
@@@ -529,8 -533,8 +536,6 @@@
      @VisibleForTesting
      public void removeUnsafe(Set<SSTableReader> toRemove)
      {
--        Pair<View, View> result = apply(view -> {
--            return updateLiveSet(toRemove, emptySet()).apply(view);
--        });
++        Pair<View, View> result = apply(view -> updateLiveSet(toRemove, emptySet()).apply(view));
      }
  }
diff --cc src/java/org/apache/cassandra/gms/ApplicationState.java
index d31f50c,70e6d9c..4e20d62
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@@ -17,10 -17,17 +17,18 @@@
   */
  package org.apache.cassandra.gms;
  
+ /**
+  * The various "states" exchanged through Gossip.
+  *
+  * <p><b>Important Note:</b> Gossip uses the ordinal of this enum in the messages it exchanges, so values in that enum
+  * should <i>not</i> be re-ordered or removed. The end of this enum should also always include some "padding" so that
+  * if newer versions add new states, old nodes that don't know about those new states don't "break" deserializing those
+  * states.
+  */
  public enum ApplicationState
  {
 -    STATUS,
 +    // never remove a state here, ordering matters.
 +    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0
      LOAD,
      SCHEMA,
      DC,
@@@ -35,15 -42,22 +43,24 @@@
      HOST_ID,
      TOKENS,
      RPC_READY,
 -    // We added SSTABLE_VERSIONS in CASSANDRA-15897 in 3.0, and at the time, 3 more ApplicationState had been added
 -    // to newer versions, so we skipped the first 3 of our original padding to ensure SSTABLE_VERSIONS can preserve
 -    // its ordinal accross versions.
 -    X1,
 -    X2,
 -    X3,
 +    // pad to allow adding new states to existing cluster
 +    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
 +    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
 +    STATUS_WITH_PORT, //Replacement for STATUS
+     /**
+      * The set of sstable versions on this node. This will usually be only the "current" sstable format (the one with
+      * which new sstables are written), but may contain more on newly upgraded nodes before `upgradesstable` has been
+      * run.
+      *
+      * <p>The value (a set of sstable {@link org.apache.cassandra.io.sstable.format.VersionAndType}) is serialized as
+      * a comma-separated list.
+      **/
+     SSTABLE_VERSIONS,
 -    // pad to allow adding new states to existing cluster
 +    // DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states above.  See CASSANDRA-16484
 +    X1,
 +    X2,
 +    X3,
 +    X4,
      X5,
      X6,
      X7,
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 1e8604b,d6d9dfb..0242d83
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -81,10 -79,10 +81,10 @@@ public class GossipDigestAckVerbHandle
          }
  
          /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
-         Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>();
 -        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
++        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<>();
          for (GossipDigest gDigest : gDigestList)
          {
 -            InetAddress addr = gDigest.getEndpoint();
 +            InetAddressAndPort addr = gDigest.getEndpoint();
              EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
              if (localEpStatePtr != null)
                  deltaEpStateMap.put(addr, localEpStatePtr);
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 7720379,69f7fee..a092c77
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -105,9 -98,7 +105,9 @@@ public class Gossiper implements IFailu
          SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
          SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
      }
-     private static List<String> ADMINISTRATIVELY_INACTIVE_STATES = Arrays.asList(VersionedValue.HIBERNATE,
-                                                                                  VersionedValue.REMOVED_TOKEN,
-                                                                                  VersionedValue.STATUS_LEFT);
 -
++    private static final List<String> ADMINISTRATIVELY_INACTIVE_STATES = Arrays.asList(VersionedValue.HIBERNATE,
++                                                                                       VersionedValue.REMOVED_TOKEN,
++                                                                                       VersionedValue.STATUS_LEFT);
      private volatile ScheduledFuture<?> scheduledGossipTask;
      private static final ReentrantLock taskLock = new ReentrantLock();
      public final static int intervalInMillis = 1000;
@@@ -124,18 -114,24 +124,18 @@@
  
      // Maximimum difference between generation value and local time we are willing to accept about a peer
      static final int MAX_GENERATION_DIFFERENCE = 86400 * 365;
--    private long fatClientTimeout;
++    private final long fatClientTimeout;
      private final Random random = new Random();
 -    private final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>()
 -    {
 -        public int compare(InetAddress addr1, InetAddress addr2)
 -        {
 -            return addr1.getHostAddress().compareTo(addr2.getHostAddress());
 -        }
 -    };
  
      /* subscribers for interest in EndpointState change */
--    private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
++    private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<>();
  
      /* live member set */
 -    private final Set<InetAddress> liveEndpoints = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
 +    @VisibleForTesting
 +    final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>();
  
      /* unreachable member set */
 -    private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
 +    private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>();
  
      /* initial seeds for joining the cluster */
      @VisibleForTesting
@@@ -255,10 -200,10 +255,10 @@@
                  taskLock.lock();
  
                  /* Update the local heartbeat counter. */
 -                endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
 +                endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
                  if (logger.isTraceEnabled())
 -                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
 -                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
 +                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
-                 final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
++                final List<GossipDigest> gDigests = new ArrayList<>();
                  Gossiper.instance.makeRandomGossipDigest(gDigests);
  
                  if (gDigests.size() > 0)
@@@ -662,7 -578,7 +662,7 @@@
              for (GossipDigest gDigest : gDigests)
              {
                  sb.append(gDigest);
--                sb.append(" ");
++                sb.append(' ');
              }
              logger.trace("Gossip Digests are : {}", sb);
          }
@@@ -738,10 -652,10 +738,10 @@@
       */
      public void assassinateEndpoint(String address) throws UnknownHostException
      {
 -        InetAddress endpoint = InetAddress.getByName(address);
 +        InetAddressAndPort endpoint = InetAddressAndPort.getByName(address);
          runInGossipStageBlocking(() -> {
              EndpointState epState = endpointStateMap.get(endpoint);
--            Collection<Token> tokens = null;
++            Collection<Token> tokens;
              logger.warn("Assassinating {} via gossip", endpoint);
  
              if (epState == null)
@@@ -1076,7 -953,25 +1076,25 @@@
          return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
      }
  
+     /**
+      * The value for the provided application state for the provided endpoint as currently known by this Gossip instance.
+      *
+      * @param endpoint the endpoint from which to get the endpoint state.
+      * @param state the endpoint state to get.
+      * @return the value of the application state {@code state} for {@code endpoint}, or {@code null} if either
+      * {@code endpoint} is not known by Gossip or has no value for {@code state}.
+      */
 -    public String getApplicationState(InetAddress endpoint, ApplicationState state)
++    public String getApplicationState(InetAddressAndPort endpoint, ApplicationState state)
+     {
+         EndpointState epState = endpointStateMap.get(endpoint);
+         if (epState == null)
+             return null;
+ 
+         VersionedValue value = epState.getApplicationState(state);
+         return value == null ? null : value.value;
+     }
+ 
 -    EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
 +    EndpointState getStateForVersionBiggerThan(InetAddressAndPort forEndpoint, int version)
      {
          EndpointState epState = endpointStateMap.get(forEndpoint);
          EndpointState reqdEndpointState = null;
@@@ -1653,7 -1456,7 +1671,7 @@@
  
      public void start(int generationNumber)
      {
--        start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
++        start(generationNumber, new EnumMap<>(ApplicationState.class));
      }
  
      /**
@@@ -1717,7 -1519,7 +1735,7 @@@
          seedsInShadowRound.clear();
          endpointShadowStateMap.clear();
          // send a completely empty syn
--        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
++        List<GossipDigest> gDigests = new ArrayList<>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                  DatabaseDescriptor.getPartitionerName(),
                  gDigests);
@@@ -1782,67 -1586,6 +1800,67 @@@
          }
      }
  
 +    /**
 +     * JMX interface for triggering an update of the seed node list.
 +     */
 +    public List<String> reloadSeeds()
 +    {
 +        logger.trace("Triggering reload of seed node list");
 +
 +        // Get the new set in the same that buildSeedsList does
 +        Set<InetAddressAndPort> tmp = new HashSet<>();
 +        try
 +        {
 +            for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds())
 +            {
 +                if (seed.equals(FBUtilities.getBroadcastAddressAndPort()))
 +                    continue;
 +                tmp.add(seed);
 +            }
 +        }
 +        // If using the SimpleSeedProvider invalid yaml added to the config since startup could
 +        // cause this to throw. Additionally, third party seed providers may throw exceptions.
 +        // Handle the error and return a null to indicate that there was a problem.
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.warn("Error while getting seed node list: {}", e.getLocalizedMessage());
 +            return null;
 +        }
 +
 +        if (tmp.size() == 0)
 +        {
 +            logger.trace("New seed node list is empty. Not updating seed list.");
 +            return getSeeds();
 +        }
 +
 +        if (tmp.equals(seeds))
 +        {
 +            logger.trace("New seed node list matches the existing list.");
 +            return getSeeds();
 +        }
 +
 +        // Add the new entries
 +        seeds.addAll(tmp);
 +        // Remove the old entries
 +        seeds.retainAll(tmp);
 +        logger.trace("New seed node list after reload {}", seeds);
 +        return getSeeds();
 +    }
 +
 +    /**
 +     * JMX endpoint for getting the list of seeds from the node
 +     */
 +    public List<String> getSeeds()
 +    {
-         List<String> seedList = new ArrayList<String>();
++        List<String> seedList = new ArrayList<>();
 +        for (InetAddressAndPort seed : seeds)
 +        {
 +            seedList.add(seed.toString());
 +        }
 +        return seedList;
 +    }
 +
      // initialize local HB state if needed, i.e., if gossiper has never been started before.
      public void maybeInitializeLocalState(int generationNbr)
      {
@@@ -2072,29 -1793,8 +2090,29 @@@
          return state != null ? state.getReleaseVersion() : null;
      }
  
 +    public Map<String, List<String>> getReleaseVersionsWithPort()
 +    {
-         Map<String, List<String>> results = new HashMap<String, List<String>>();
++        Map<String, List<String>> results = new HashMap<>();
 +        Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
 +
 +        for (InetAddressAndPort host : allHosts)
 +        {
 +            CassandraVersion version = getReleaseVersion(host);
 +            String stringVersion = version == null ? "" : version.toString();
 +            List<String> hosts = results.get(stringVersion);
 +            if (hosts == null)
 +            {
 +                hosts = new ArrayList<>();
 +                results.put(stringVersion, hosts);
 +            }
 +            hosts.add(host.getHostAddressAndPort());
 +        }
 +
 +        return results;
 +    }
 +
      @Nullable
 -    public UUID getSchemaVersion(InetAddress ep)
 +    public UUID getSchemaVersion(InetAddressAndPort ep)
      {
          EndpointState state = getEndpointStateForEndpoint(ep);
          return state != null ? state.getSchemaVersion() : null;
diff --cc src/java/org/apache/cassandra/gms/VersionedValue.java
index 7c54559,ff3c48b..880cb98
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@@ -31,9 -33,10 +33,10 @@@ import org.apache.cassandra.db.TypeSize
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.IVersionedSerializer;
 -import org.apache.cassandra.io.sstable.format.Version;
+ import org.apache.cassandra.io.sstable.format.VersionAndType;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.commons.lang3.StringUtils;
@@@ -109,7 -111,7 +112,7 @@@ public class VersionedValue implements 
      @Override
      public String toString()
      {
--        return "Value(" + value + "," + version + ")";
++        return "Value(" + value + ',' + version + ')';
      }
  
      public byte[] toBytes()
diff --cc src/java/org/apache/cassandra/io/sstable/Descriptor.java
index dc67451,7950e7b..b781ebf
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@@ -77,10 -74,16 +77,16 @@@ public class Descripto
       */
      public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
      {
 -        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
 +        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType);
      }
  
+     @VisibleForTesting
+     public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
+     {
 -        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
++        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType);
+     }
+ 
 -    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)
 +    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
      {
          assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
          this.version = version;
diff --cc src/java/org/apache/cassandra/io/sstable/format/Version.java
index 0e9e303,2b9dcbd..501ae85
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@@ -97,7 -105,7 +97,6 @@@ public abstract class Versio
          return version;
      }
  
--
      @Override
      public boolean equals(Object o)
      {
diff --cc src/java/org/apache/cassandra/schema/Schema.java
index c04c631,0000000..c5c1f36
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@@ -1,890 -1,0 +1,891 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
++import java.net.UnknownHostException;
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.MapDifference;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.functions.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
 +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
 +
 +import static java.lang.String.format;
 +
 +import static com.google.common.collect.Iterables.size;
 +
 +public final class Schema implements SchemaProvider
 +{
 +    public static final Schema instance = new Schema();
 +
 +    private volatile Keyspaces keyspaces = Keyspaces.none();
 +
 +    // UUID -> mutable metadata ref map. We have to update these in place every time a table changes.
 +    private final Map<TableId, TableMetadataRef> metadataRefs = new NonBlockingHashMap<>();
 +
 +    // (keyspace name, index name) -> mutable metadata ref map. We have to update these in place every time an index changes.
 +    private final Map<Pair<String, String>, TableMetadataRef> indexMetadataRefs = new NonBlockingHashMap<>();
 +
 +    // Keyspace objects, one per keyspace. Only one instance should ever exist for any given keyspace.
 +    private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<>();
 +
 +    private volatile UUID version;
 +
 +    private final List<SchemaChangeListener> changeListeners = new CopyOnWriteArrayList<>();
 +
 +    /**
 +     * Initialize empty schema object and load the hardcoded system tables
 +     */
 +    private Schema()
 +    {
 +        if (DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized())
 +        {
 +            load(SchemaKeyspace.metadata());
 +            load(SystemKeyspace.metadata());
 +        }
 +    }
 +
 +    /**
 +     * load keyspace (keyspace) definitions, but do not initialize the keyspace instances.
 +     * Schema version may be updated as the result.
 +     */
 +    public void loadFromDisk()
 +    {
 +        loadFromDisk(true);
 +    }
 +
 +    /**
 +     * Load schema definitions from disk.
 +     *
 +     * @param updateVersion true if schema version needs to be updated
 +     */
 +    public void loadFromDisk(boolean updateVersion)
 +    {
 +        SchemaDiagnostics.schemataLoading(this);
 +        SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
 +        if (updateVersion)
 +            updateVersion();
 +        SchemaDiagnostics.schemataLoaded(this);
 +    }
 +
 +    /**
 +     * Update (or insert) new keyspace definition
 +     *
 +     * @param ksm The metadata about keyspace
 +     */
 +    synchronized public void load(KeyspaceMetadata ksm)
 +    {
 +        KeyspaceMetadata previous = keyspaces.getNullable(ksm.name);
 +
 +        if (previous == null)
 +            loadNew(ksm);
 +        else
 +            reload(previous, ksm);
 +
 +        keyspaces = keyspaces.withAddedOrUpdated(ksm);
 +    }
 +
 +    private void loadNew(KeyspaceMetadata ksm)
 +    {
 +        ksm.tablesAndViews()
 +           .forEach(metadata -> metadataRefs.put(metadata.id, new TableMetadataRef(metadata)));
 +
 +        ksm.tables
 +           .indexTables()
 +           .forEach((name, metadata) -> indexMetadataRefs.put(Pair.create(ksm.name, name), new TableMetadataRef(metadata)));
 +
 +        SchemaDiagnostics.metadataInitialized(this, ksm);
 +    }
 +
 +    private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
 +    {
 +        Keyspace keyspace = getKeyspaceInstance(updated.name);
 +        if (null != keyspace)
 +            keyspace.setMetadata(updated);
 +
 +        Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, updated.tables);
 +        Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views);
 +
 +        MapDifference<String, TableMetadata> indexesDiff = previous.tables.indexesDiff(updated.tables);
 +
 +        // clean up after removed entries
 +        tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id));
 +        viewsDiff.dropped.forEach(view -> metadataRefs.remove(view.metadata.id));
 +
 +        indexesDiff.entriesOnlyOnLeft()
 +                   .values()
 +                   .forEach(indexTable -> indexMetadataRefs.remove(Pair.create(indexTable.keyspace, indexTable.indexName().get())));
 +
 +        // load up new entries
 +        tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table)));
 +        viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata)));
 +
 +        indexesDiff.entriesOnlyOnRight()
 +                   .values()
 +                   .forEach(indexTable -> indexMetadataRefs.put(Pair.create(indexTable.keyspace, indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 +
 +        // refresh refs to updated ones
 +        tablesDiff.altered.forEach(diff -> metadataRefs.get(diff.after.id).set(diff.after));
 +        viewsDiff.altered.forEach(diff -> metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata));
 +
 +        indexesDiff.entriesDiffering()
 +                   .values()
 +                   .stream()
 +                   .map(MapDifference.ValueDifference::rightValue)
 +                   .forEach(indexTable -> indexMetadataRefs.get(Pair.create(indexTable.keyspace, indexTable.indexName().get())).set(indexTable));
 +
 +        SchemaDiagnostics.metadataReloaded(this, previous, updated, tablesDiff, viewsDiff, indexesDiff);
 +    }
 +
 +    public void registerListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.add(listener);
 +    }
 +
 +    @SuppressWarnings("unused")
 +    public void unregisterListener(SchemaChangeListener listener)
 +    {
 +        changeListeners.remove(listener);
 +    }
 +
 +    /**
 +     * Get keyspace instance by name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return Keyspace object or null if keyspace was not found
 +     */
 +    @Override
 +    public Keyspace getKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.get(keyspaceName);
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
 +    {
 +        TableMetadata metadata = getTableMetadata(id);
 +        if (metadata == null)
 +            return null;
 +
 +        Keyspace instance = getKeyspaceInstance(metadata.keyspace);
 +        if (instance == null)
 +            return null;
 +
 +        return instance.hasColumnFamilyStore(metadata.id)
 +             ? instance.getColumnFamilyStore(metadata.id)
 +             : null;
 +    }
 +
 +    /**
 +     * Store given Keyspace instance to the schema
 +     *
 +     * @param keyspace The Keyspace instance to store
 +     *
 +     * @throws IllegalArgumentException if Keyspace is already stored
 +     */
 +    @Override
 +    public void storeKeyspaceInstance(Keyspace keyspace)
 +    {
 +        if (keyspaceInstances.putIfAbsent(keyspace.getName(), keyspace) != null)
 +            throw new IllegalArgumentException(String.format("Keyspace %s was already initialized.", keyspace.getName()));
 +    }
 +
 +    /**
 +     * Remove keyspace from schema
 +     *
 +     * @param keyspaceName The name of the keyspace to remove
 +     *
 +     * @return removed keyspace instance or null if it wasn't found
 +     */
 +    public Keyspace removeKeyspaceInstance(String keyspaceName)
 +    {
 +        return keyspaceInstances.remove(keyspaceName);
 +    }
 +
 +    public Keyspaces snapshot()
 +    {
 +        return keyspaces;
 +    }
 +
 +    /**
 +     * Remove keyspace definition from system
 +     *
 +     * @param ksm The keyspace definition to remove
 +     */
 +    synchronized void unload(KeyspaceMetadata ksm)
 +    {
 +        keyspaces = keyspaces.without(ksm.name);
 +
 +        ksm.tablesAndViews()
 +           .forEach(t -> metadataRefs.remove(t.id));
 +
 +        ksm.tables
 +           .indexTables()
 +           .keySet()
 +           .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, name)));
 +
 +        SchemaDiagnostics.metadataRemoved(this, ksm);
 +    }
 +
 +    public int getNumberOfTables()
 +    {
 +        return keyspaces.stream().mapToInt(k -> size(k.tablesAndViews())).sum();
 +    }
 +
 +    public ViewMetadata getView(String keyspaceName, String viewName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        return (ksm == null) ? null : ksm.views.getNullable(viewName);
 +    }
 +
 +    /**
 +     * Get metadata about keyspace by its name
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return The keyspace metadata or null if it wasn't found
 +     */
 +    @Override
 +    public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
 +        return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
 +    }
 +
 +    private Set<String> getNonSystemKeyspacesSet()
 +    {
 +        return Sets.difference(keyspaces.names(), SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
 +    }
 +
 +    /**
 +     * @return collection of the non-system keyspaces (note that this count as system only the
 +     * non replicated keyspaces, so keyspace like system_traces which are replicated are actually
 +     * returned. See getUserKeyspace() below if you don't want those)
 +     */
 +    public ImmutableList<String> getNonSystemKeyspaces()
 +    {
 +        return ImmutableList.copyOf(getNonSystemKeyspacesSet());
 +    }
 +
 +    /**
 +     * @return a collection of keyspaces that do not use LocalStrategy for replication
 +     */
 +    public List<String> getNonLocalStrategyKeyspaces()
 +    {
 +        return keyspaces.stream()
 +                        .filter(keyspace -> keyspace.params.replication.klass != LocalStrategy.class)
 +                        .map(keyspace -> keyspace.name)
 +                        .collect(Collectors.toList());
 +    }
 +
 +    /**
 +     * @return collection of the user defined keyspaces
 +     */
 +    public List<String> getUserKeyspaces()
 +    {
 +        return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES));
 +    }
 +
 +    /**
 +     * Get metadata about keyspace inner ColumnFamilies
 +     *
 +     * @param keyspaceName The name of the keyspace
 +     *
 +     * @return metadata about ColumnFamilies the belong to the given keyspace
 +     */
 +    public Iterable<TableMetadata> getTablesAndViews(String keyspaceName)
 +    {
 +        assert keyspaceName != null;
 +        KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName);
 +        assert ksm != null;
 +        return ksm.tablesAndViews();
 +    }
 +
 +    /**
 +     * @return collection of the all keyspace names registered in the system (system and non-system)
 +     */
 +    public Set<String> getKeyspaces()
 +    {
 +        return keyspaces.names();
 +    }
 +
 +    /* TableMetadata/Ref query/control methods */
 +
 +    /**
 +     * Given a keyspace name and table/view name, get the table metadata
 +     * reference. If the keyspace name or table/view name is not present
 +     * this method returns null.
 +     *
 +     * @return TableMetadataRef object or null if it wasn't found
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(String keyspace, String table)
 +    {
 +        TableMetadata tm = getTableMetadata(keyspace, table);
 +        return tm == null
 +             ? null
 +             : metadataRefs.get(tm.id);
 +    }
 +
 +    public TableMetadataRef getIndexTableMetadataRef(String keyspace, String index)
 +    {
 +        return indexMetadataRefs.get(Pair.create(keyspace, index));
 +    }
 +
 +    Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs()
 +    {
 +        return indexMetadataRefs;
 +    }
 +
 +    /**
 +     * Get Table metadata by its identifier
 +     *
 +     * @param id table or view identifier
 +     *
 +     * @return metadata about Table or View
 +     */
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(TableId id)
 +    {
 +        return metadataRefs.get(id);
 +    }
 +
 +    @Override
 +    public TableMetadataRef getTableMetadataRef(Descriptor descriptor)
 +    {
 +        return getTableMetadataRef(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    Map<TableId, TableMetadataRef> getTableMetadataRefs()
 +    {
 +        return metadataRefs;
 +    }
 +
 +    /**
 +     * Given a keyspace name and table name, get the table
 +     * meta data. If the keyspace name or table name is not valid
 +     * this function returns null.
 +     *
 +     * @param keyspace The keyspace name
 +     * @param table The table name
 +     *
 +     * @return TableMetadata object or null if it wasn't found
 +     */
 +    public TableMetadata getTableMetadata(String keyspace, String table)
 +    {
 +        assert keyspace != null;
 +        assert table != null;
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
 +        return ksm == null
 +             ? null
 +             : ksm.getTableOrViewNullable(table);
 +    }
 +
 +    @Override
 +    public TableMetadata getTableMetadata(TableId id)
 +    {
 +        TableMetadata table = keyspaces.getTableOrViewNullable(id);
 +        return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
 +    }
 +
 +    public TableMetadata validateTable(String keyspaceName, String tableName)
 +    {
 +        if (tableName.isEmpty())
 +            throw new InvalidRequestException("non-empty table is required");
 +
 +        KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
 +        if (keyspace == null)
 +            throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName));
 +
 +        TableMetadata metadata = keyspace.getTableOrViewNullable(tableName);
 +        if (metadata == null)
 +            throw new InvalidRequestException(format("table %s does not exist", tableName));
 +
 +        return metadata;
 +    }
 +
 +    public TableMetadata getTableMetadata(Descriptor descriptor)
 +    {
 +        return getTableMetadata(descriptor.ksname, descriptor.cfname);
 +    }
 +
 +    /* Function helpers */
 +
 +    /**
 +     * Get all function overloads with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @return an empty list if the keyspace or the function name are not found;
 +     *         a non-empty collection of {@link Function} otherwise
 +     */
 +    public Collection<Function> getFunctions(FunctionName name)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name must be fully qualified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Collections.emptyList()
 +             : ksm.functions.get(name);
 +    }
 +
 +    /**
 +     * Find the function with the specified name
 +     *
 +     * @param name fully qualified function name
 +     * @param argTypes function argument types
 +     * @return an empty {@link Optional} if the keyspace or the function name are not found;
 +     *         a non-empty optional of {@link Function} otherwise
 +     */
 +    public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes)
 +    {
 +        if (!name.hasKeyspace())
 +            throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name));
 +
 +        KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace);
 +        return ksm == null
 +             ? Optional.empty()
 +             : ksm.functions.find(name, argTypes);
 +    }
 +
 +    /* Version control */
 +
 +    /**
 +     * @return current schema version
 +     */
 +    public UUID getVersion()
 +    {
 +        return version;
 +    }
 +
 +    /**
 +     * Checks whether the given schema version is the same as the current local schema.
 +     */
 +    public boolean isSameVersion(UUID schemaVersion)
 +    {
 +        return schemaVersion != null && schemaVersion.equals(version);
 +    }
 +
 +    /**
 +     * Checks whether the current schema is empty.
 +     */
 +    public boolean isEmpty()
 +    {
 +        return SchemaConstants.emptyVersion.equals(version);
 +    }
 +
 +    /**
 +     * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
 +     * will be converted into UUID which would act as content-based version of the schema.
 +     */
 +    public void updateVersion()
 +    {
 +        version = SchemaKeyspace.calculateSchemaDigest();
 +        SystemKeyspace.updateSchemaVersion(version);
 +        SchemaDiagnostics.versionUpdated(this);
 +    }
 +
 +    /*
 +     * Like updateVersion, but also announces via gossip
 +     */
 +    public void updateVersionAndAnnounce()
 +    {
 +        updateVersion();
 +        passiveAnnounceVersion();
 +    }
 +
 +    /**
 +     * Announce my version passively over gossip.
 +     * Used to notify nodes as they arrive in the cluster.
 +     */
 +    private void passiveAnnounceVersion()
 +    {
 +        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
 +        SchemaDiagnostics.versionAnnounced(this);
 +    }
 +
 +    /**
 +     * Clear all KS/CF metadata and reset version.
 +     */
 +    public synchronized void clear()
 +    {
 +        getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k)));
 +        updateVersionAndAnnounce();
 +        SchemaDiagnostics.schemataCleared(this);
 +    }
 +
 +    /*
 +     * Reload schema from local disk. Useful if a user made changes to schema tables by hand, or has suspicion that
 +     * in-memory representation got out of sync somehow with what's on disk.
 +     */
 +    public synchronized void reloadSchemaAndAnnounceVersion()
 +    {
 +        Keyspaces before = keyspaces.filter(k -> !SchemaConstants.isLocalSystemKeyspace(k.name));
 +        Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces();
 +        merge(Keyspaces.diff(before, after));
 +        updateVersionAndAnnounce();
 +    }
 +
 +    /**
 +     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
 +     * (which also involves fs operations on add/drop ks/cf)
 +     *
 +     * @param mutations the schema changes to apply
 +     *
 +     * @throws ConfigurationException If one of metadata attributes has invalid value
 +     */
 +    public synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations)
 +    {
 +        merge(mutations);
 +        updateVersionAndAnnounce();
 +    }
 +
-     public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now)
++    public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now) throws UnknownHostException
 +    {
 +        KeyspacesDiff diff;
 +        try
 +        {
 +            Keyspaces before = keyspaces;
 +            Keyspaces after = transformation.apply(before);
 +            diff = Keyspaces.diff(before, after);
 +        }
 +        catch (RuntimeException e)
 +        {
 +            return new TransformationResult(e);
 +        }
 +
 +        if (diff.isEmpty())
 +            return new TransformationResult(diff, Collections.emptyList());
 +
 +        Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, now);
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        merge(diff);
 +        updateVersion();
 +        if (!locally)
 +            passiveAnnounceVersion();
 +
 +        return new TransformationResult(diff, mutations);
 +    }
 +
 +    public static final class TransformationResult
 +    {
 +        public final boolean success;
 +        public final RuntimeException exception;
 +        public final KeyspacesDiff diff;
 +        public final Collection<Mutation> mutations;
 +
 +        private TransformationResult(boolean success, RuntimeException exception, KeyspacesDiff diff, Collection<Mutation> mutations)
 +        {
 +            this.success = success;
 +            this.exception = exception;
 +            this.diff = diff;
 +            this.mutations = mutations;
 +        }
 +
 +        TransformationResult(RuntimeException exception)
 +        {
 +            this(false, exception, null, null);
 +        }
 +
 +        TransformationResult(KeyspacesDiff diff, Collection<Mutation> mutations)
 +        {
 +            this(true, null, diff, mutations);
 +        }
 +    }
 +
 +    synchronized void merge(Collection<Mutation> mutations)
 +    {
 +        // only compare the keyspaces affected by this set of schema mutations
 +        Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(mutations);
 +
 +        // fetch the current state of schema for the affected keyspaces only
 +        Keyspaces before = keyspaces.filter(k -> affectedKeyspaces.contains(k.name));
 +
 +        // apply the schema mutations
 +        SchemaKeyspace.applyChanges(mutations);
 +
 +        // apply the schema mutations and fetch the new versions of the altered keyspaces
 +        Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 +
 +        merge(Keyspaces.diff(before, after));
 +    }
 +
 +    private void merge(KeyspacesDiff diff)
 +    {
 +        diff.dropped.forEach(this::dropKeyspace);
 +        diff.created.forEach(this::createKeyspace);
 +        diff.altered.forEach(this::alterKeyspace);
 +    }
 +
 +    private void alterKeyspace(KeyspaceDiff delta)
 +    {
 +        SchemaDiagnostics.keyspaceAltering(this, delta);
 +
 +        // drop tables and views
 +        delta.views.dropped.forEach(this::dropView);
 +        delta.tables.dropped.forEach(this::dropTable);
 +
 +        load(delta.after);
 +
 +        // add tables and views
 +        delta.tables.created.forEach(this::createTable);
 +        delta.views.created.forEach(this::createView);
 +
 +        // update tables and views
 +        delta.tables.altered.forEach(diff -> alterTable(diff.after));
 +        delta.views.altered.forEach(diff -> alterView(diff.after));
 +
 +        // deal with all added, and altered views
 +        Keyspace.open(delta.after.name).viewManager.reload(true);
 +
 +        // notify on everything dropped
 +        delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) uda));
 +        delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) udf));
 +        delta.views.dropped.forEach(this::notifyDropView);
 +        delta.tables.dropped.forEach(this::notifyDropTable);
 +        delta.types.dropped.forEach(this::notifyDropType);
 +
 +        // notify on everything created
 +        delta.types.created.forEach(this::notifyCreateType);
 +        delta.tables.created.forEach(this::notifyCreateTable);
 +        delta.views.created.forEach(this::notifyCreateView);
 +        delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) udf));
 +        delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) uda));
 +
 +        // notify on everything altered
 +        if (!delta.before.params.equals(delta.after.params))
 +            notifyAlterKeyspace(delta.before, delta.after);
 +        delta.types.altered.forEach(diff -> notifyAlterType(diff.before, diff.after));
 +        delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, diff.after));
 +        delta.views.altered.forEach(diff -> notifyAlterView(diff.before, diff.after));
 +        delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, diff.after));
 +        delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after));
 +        SchemaDiagnostics.keyspaceAltered(this, delta);
 +    }
 +
 +    private void createKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceCreating(this, keyspace);
 +        load(keyspace);
 +        Keyspace.open(keyspace.name);
 +
 +        notifyCreateKeyspace(keyspace);
 +        keyspace.types.forEach(this::notifyCreateType);
 +        keyspace.tables.forEach(this::notifyCreateTable);
 +        keyspace.views.forEach(this::notifyCreateView);
 +        keyspace.functions.udfs().forEach(this::notifyCreateFunction);
 +        keyspace.functions.udas().forEach(this::notifyCreateAggregate);
 +        SchemaDiagnostics.keyspaceCreated(this, keyspace);
 +    }
 +
 +    private void dropKeyspace(KeyspaceMetadata keyspace)
 +    {
 +        SchemaDiagnostics.keyspaceDroping(this, keyspace);
 +        keyspace.views.forEach(this::dropView);
 +        keyspace.tables.forEach(this::dropTable);
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(keyspace.name);
 +        unload(keyspace);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        keyspace.functions.udas().forEach(this::notifyDropAggregate);
 +        keyspace.functions.udfs().forEach(this::notifyDropFunction);
 +        keyspace.views.forEach(this::notifyDropView);
 +        keyspace.tables.forEach(this::notifyDropTable);
 +        keyspace.types.forEach(this::notifyDropType);
 +        notifyDropKeyspace(keyspace);
 +        SchemaDiagnostics.keyspaceDroped(this, keyspace);
 +    }
 +
 +    private void dropView(ViewMetadata metadata)
 +    {
 +        Keyspace.open(metadata.keyspace()).viewManager.dropView(metadata.name());
 +        dropTable(metadata.metadata);
 +    }
 +
 +    private void dropTable(TableMetadata metadata)
 +    {
 +        SchemaDiagnostics.tableDropping(this, metadata);
 +        ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
 +        assert cfs != null;
 +        // make sure all the indexes are dropped, or else.
 +        cfs.indexManager.markAllIndexesRemoved();
 +        CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), (sstable) -> true, true);
 +        if (DatabaseDescriptor.isAutoSnapshot())
 +            cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
 +        CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
 +        Keyspace.open(metadata.keyspace).dropCf(metadata.id);
 +        SchemaDiagnostics.tableDropped(this, metadata);
 +    }
 +
 +    private void createTable(TableMetadata table)
 +    {
 +        SchemaDiagnostics.tableCreating(this, table);
 +        Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true);
 +        SchemaDiagnostics.tableCreated(this, table);
 +    }
 +
 +    private void createView(ViewMetadata view)
 +    {
 +        Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true);
 +    }
 +
 +    private void alterTable(TableMetadata updated)
 +    {
 +        SchemaDiagnostics.tableAltering(this, updated);
 +        Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
 +        SchemaDiagnostics.tableAltered(this, updated);
 +    }
 +
 +    private void alterView(ViewMetadata updated)
 +    {
 +        Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload();
 +    }
 +
 +    private void notifyCreateKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name));
 +    }
 +
 +    private void notifyCreateTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, metadata.name));
 +    }
 +
 +    private void notifyCreateView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onCreateView(view.keyspace(), view.name()));
 +    }
 +
 +    private void notifyCreateType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onCreateType(ut.keyspace, ut.getNameAsString()));
 +    }
 +
 +    private void notifyCreateFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyCreateAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyAlterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterKeyspace(after.name));
 +    }
 +
 +    private void notifyAlterTable(TableMetadata before, TableMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = before.changeAffectsPreparedStatements(after);
 +        changeListeners.forEach(l -> l.onAlterTable(after.keyspace, after.name, changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterView(ViewMetadata before, ViewMetadata after)
 +    {
 +        boolean changeAffectedPreparedStatements = before.metadata.changeAffectsPreparedStatements(after.metadata);
 +        changeListeners.forEach(l ->l.onAlterView(after.keyspace(), after.name(), changeAffectedPreparedStatements));
 +    }
 +
 +    private void notifyAlterType(UserType before, UserType after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterType(after.keyspace, after.getNameAsString()));
 +    }
 +
 +    private void notifyAlterFunction(UDFunction before, UDFunction after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
 +    {
 +        changeListeners.forEach(l -> l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes()));
 +    }
 +
 +    private void notifyDropKeyspace(KeyspaceMetadata ksm)
 +    {
 +        changeListeners.forEach(l -> l.onDropKeyspace(ksm.name));
 +    }
 +
 +    private void notifyDropTable(TableMetadata metadata)
 +    {
 +        changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, metadata.name));
 +    }
 +
 +    private void notifyDropView(ViewMetadata view)
 +    {
 +        changeListeners.forEach(l -> l.onDropView(view.keyspace(), view.name()));
 +    }
 +
 +    private void notifyDropType(UserType ut)
 +    {
 +        changeListeners.forEach(l -> l.onDropType(ut.keyspace, ut.getNameAsString()));
 +    }
 +
 +    private void notifyDropFunction(UDFunction udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes()));
 +    }
 +
 +    private void notifyDropAggregate(UDAggregate udf)
 +    {
 +        changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
 +    }
 +
 +
 +    /**
 +     * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
 +     * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
 +     */
 +    public static String schemaVersionToString(UUID version)
 +    {
 +        return version == null
 +               ? "unknown"
 +               : SchemaConstants.emptyVersion.equals(version)
 +                 ? "(empty)"
 +                 : version.toString();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/schema/SchemaTransformation.java
index c19ac7c,0000000..e2290a3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
@@@ -1,31 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
++import java.net.UnknownHostException;
++
 +public interface SchemaTransformation
 +{
 +    /**
 +     * Apply a statement transformation to a schema snapshot.
 +     *
 +     * Implementing methods should be side-effect free.
 +     *
 +     * @param schema Keyspaces to base the transformation on
 +     * @return Keyspaces transformed by the statement
 +     */
-     Keyspaces apply(Keyspaces schema);
++    Keyspaces apply(Keyspaces schema) throws UnknownHostException;
 +}
diff --cc src/java/org/apache/cassandra/service/StartupChecks.java
index 85b5836,cb10ab4..dadb0c5
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@@ -35,16 -35,11 +35,14 @@@ import com.google.common.collect.Iterab
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import net.jpountz.lz4.LZ4Factory;
- import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
- import org.apache.cassandra.schema.SchemaKeyspace;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SystemKeyspace;
@@@ -55,12 -52,8 +53,14 @@@ import org.apache.cassandra.io.util.Fil
  import org.apache.cassandra.utils.NativeLibrary;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JavaUtils;
++import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.SigarLibrary;
  
++import static java.lang.String.format;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME;
 +
  /**
   * Verifies that the system and environment is in a fit state to be started.
   * Used in CassandraDaemon#setup() to check various settings and invariants.
@@@ -83,7 -76,7 +83,6 @@@
  public class StartupChecks
  {
      private static final Logger logger = LoggerFactory.getLogger(StartupChecks.class);
--
      // List of checks to run before starting up. If any test reports failure, startup will be halted.
      private final List<StartupCheck> preFlightChecks = new ArrayList<>();
  
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 8c5437c,65596aa..e0ac1b6
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -81,8 -71,11 +81,10 @@@ import org.apache.cassandra.dht.Range
  import org.apache.cassandra.dht.Token.TokenFactory;
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.gms.*;
 -import org.apache.cassandra.hints.HintVerbHandler;
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.io.sstable.SSTableLoader;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.format.VersionAndType;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.locator.*;
  import org.apache.cassandra.metrics.StorageMetrics;
@@@ -323,6 -293,44 +327,8 @@@ public class StorageService extends Not
          jmxObjectName = "org.apache.cassandra.db:type=StorageService";
          MBeanWrapper.instance.registerMBean(this, jmxObjectName);
          MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME);
+ 
 -        legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName);
 -
 -        /* register the verb handlers */
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE, new ProposeVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, new HintVerbHandler());
 -
 -        // see BootStrapper for a summary of how the bootstrap verbs interact
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPAIR_MESSAGE, new RepairMessageVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_SHUTDOWN, new GossipShutdownVerbHandler());
 -
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
 -
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler());
 -
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
 -
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
 -        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
 -
+         sstablesTracker = new SSTablesGlobalTracker(SSTableFormat.Type.current());
      }
  
      public void registerDaemon(CassandraDaemon daemon)
@@@ -940,13 -899,18 +946,14 @@@
              // for bootstrap to get the load info it needs.
              // (we won't be part of the storage ring though until we add a counterId to our state, below.)
              // Seed the host ID-to-endpoint map with our own ID.
 -            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
 +            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddressAndPort());
              appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
              appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
 -            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
 +            appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort()));
 +            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress()));
              appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+             appStates.put(ApplicationState.SSTABLE_VERSIONS, valueFactory.sstableVersions(sstablesTracker.versionsInUse()));
  
 -            // load the persisted ring state. This used to be done earlier in the init process,
 -            // but now we always perform a shadow round when preparing to join and we have to
 -            // clear endpoint states after doing that.
 -            loadRingState();
 -
              logger.info("Starting up server gossip");
              Gossiper.instance.register(this);
              Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
index e94c2c4,0000000..8d051d3
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java
@@@ -1,57 -1,0 +1,61 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.upgrade;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.shared.Versions;
 +
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 +
 +public class CompactStorage3to4UpgradeTest extends UpgradeTestBase
 +{
 +    public static final String TABLE_NAME = "cs_tbl";
 +
 +    @Test
 +    public void testNullClusteringValues() throws Throwable
 +    {
 +        new TestCase().nodes(1)
 +                      .upgrade(Versions.Major.v30, Versions.Major.v4)
++                      .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
 +                      .setup(cluster -> {
 +                          String create = "CREATE TABLE %s.%s(k int, c1 int, c2 int, v int, PRIMARY KEY (k, c1, c2)) " +
 +                                          "WITH compaction = { 'class':'LeveledCompactionStrategy', 'enabled':'false'} AND COMPACT STORAGE";
 +                          cluster.schemaChange(String.format(create, KEYSPACE, TABLE_NAME));
 +
 +                          String insert = "INSERT INTO %s.%s(k, c1, v) values (?, ?, ?)";
 +                          cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 1, 1, 1);
 +                          cluster.get(1).flush(KEYSPACE);
 +
 +                          cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 2, 2, 2);
 +                          cluster.get(1).flush(KEYSPACE);
 +
 +                          cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME));
 +                      })
 +                      .runAfterNodeUpgrade((cluster, node) -> {
 +                          cluster.get(1).forceCompact(KEYSPACE, TABLE_NAME);
 +                          Object[][] actual = cluster.get(1).executeInternal(String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE_NAME));
 +                          assertRows(actual, new Object[] {1, 1, null, 1}, new Object[] {2, 2, null, 2});
 +                      })
 +                      .run();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
index 46a7b1d,0000000..bac94a1
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java
@@@ -1,116 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3.validation.operations;
 +
 +import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.cql3.QueryHandler;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaChangeListener;
 +
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class CompactTableTest extends CQLTester
 +{
 +    @Test
 +    public void dropCompactStorageTest() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck) VALUES (1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, null, 1, null));
 +
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck) VALUES (1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, 1, null));
 +
 +        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
 +        alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +        assertRows(execute( "SELECT * FROM %s"),
 +                   row(1, 1, 1));
 +    }
 +
 +    @Test
 +    public void dropCompactStorageShouldInvalidatePreparedStatements() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;");
 +        execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
 +        String templateSelect = "SELECT * FROM %s WHERE pk = 1";
 +        assertRows(execute(templateSelect), row(1, 1, 1));
 +
 +        // Verify that the prepared statement has been added to the cache after our first query.
 +        String formattedQuery = formatQuery(templateSelect);
 +        ConcurrentMap<String, QueryHandler.Prepared> original = QueryProcessor.getInternalStatements();
 +        assertTrue(original.containsKey(formattedQuery));
 +
 +        // Verify that schema change listeners are told statements are affected with DROP COMPACT STORAGE.
 +        SchemaChangeListener listener = new SchemaChangeListener()
 +        {
 +            public void onAlterTable(String keyspace, String table, boolean affectsStatements)
 +            {
 +                assertTrue(affectsStatements);
 +            }
 +        };
 +
 +        Schema.instance.registerListener(listener);
 +
 +        try
 +        {
 +            alterTable("ALTER TABLE %s DROP COMPACT STORAGE");
 +            ConcurrentMap<String, QueryHandler.Prepared> postDrop = QueryProcessor.getInternalStatements();
 +
 +            // Verify that the prepared statement has been removed the cache after DROP COMPACT STORAGE.
 +            assertFalse(postDrop.containsKey(formattedQuery));
 +
 +            // Verify that the prepared statement has been added back to the cache after our second query.
 +            assertRows(execute(templateSelect), row(1, 1, 1));
 +            ConcurrentMap<String, QueryHandler.Prepared> postQuery = QueryProcessor.getInternalStatements();
 +            assertTrue(postQuery.containsKey(formattedQuery));
 +        }
 +        finally
 +        {
 +            // Clean up the listener so this doesn't fail other tests.
 +            Schema.instance.unregisterListener(listener);
 +        }
 +    }
 +
 +    @Test
 +    public void compactStorageSemanticsTest() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE");
 +        execute("INSERT INTO %s (pk, ck) VALUES (?, ?)", 1, 1);
 +        execute("DELETE FROM %s WHERE pk = ? AND ck = ?", 1, 1);
 +        assertEmpty(execute("SELECT * FROM %s WHERE pk = ?", 1));
 +
 +        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2)) WITH COMPACT STORAGE");
 +        execute("INSERT INTO %s (pk, ck1, v) VALUES (?, ?, ?)", 2, 2, 2);
 +        assertRows(execute("SELECT * FROM %s WHERE pk = ?",2),
 +                   row(2, 2, null, 2));
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 910445f,7705d8b..4390b20
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@@ -148,17 -145,22 +148,22 @@@ public class TrackerTes
      @Test
      public void testAddInitialSSTables()
      {
 -        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS));
          Tracker tracker = cfs.getTracker();
+         MockListener listener = new MockListener(false);
+         tracker.subscribe(listener);
          List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17, cfs),
                                                         MockSchema.sstable(1, 121, cfs),
                                                         MockSchema.sstable(2, 9, cfs));
          tracker.addInitialSSTables(copyOf(readers));
  
          Assert.assertEquals(3, tracker.view.get().sstables.size());
+         Assert.assertEquals(1, listener.senders.size());
+         Assert.assertEquals(1, listener.received.size());
+         Assert.assertTrue(listener.received.get(0) instanceof InitialSSTableAddedNotification);
  
          for (SSTableReader reader : readers)
 -            Assert.assertTrue(reader.isKeyCacheSetup());
 +            Assert.assertTrue(reader.isKeyCacheEnabled());
  
          Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
      }
@@@ -369,9 -367,8 +376,9 @@@
          MockListener failListener = new MockListener(true);
          tracker.subscribe(failListener);
          tracker.subscribe(listener);
-         Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null));
 -        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null));
++        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null, null));
          Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
 +        Assert.assertFalse(((SSTableAddedNotification) listener.received.get(0)).memtable().isPresent());
          listener.received.clear();
          Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
          Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org