You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/26 05:47:31 UTC

[cassandra] branch cassandra-3.11 updated: Prevent broken concurrent schema read/writes

This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new fa532a6  Prevent broken concurrent schema read/writes
fa532a6 is described below

commit fa532a61f810b428ccfdf4964684794a7fc0e885
Author: Bereng <be...@gmail.com>
AuthorDate: Wed Oct 20 10:44:50 2021 +0200

    Prevent broken concurrent schema read/writes
    
    patch by Berenguer Blasi; reviewed by Caleb Rackliffe for CASSANDRA-16996
    
    Co-authored-by: Berenguer Blasi <be...@gmail.com>
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
 src/java/org/apache/cassandra/db/Keyspace.java     |   2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java    | 175 +++++++++++----------
 .../cassandra/schema/SchemaKeyspaceTables.java     |  77 +++++++++
 .../org/apache/cassandra/service/ClientState.java  |  16 +-
 .../cassandra/utils/NativeSSTableLoaderClient.java |  41 +++--
 .../apache/cassandra/config/CFMetaDataTest.java    |   8 +-
 .../cassandra/cql3/PstmtPersistenceTest.java       |   8 +-
 test/unit/org/apache/cassandra/cql3/ViewTest.java  |   3 +-
 .../cql3/validation/operations/AlterTest.java      |  14 +-
 .../cql3/validation/operations/CreateTest.java     |  20 +--
 .../operations/InsertUpdateIfConditionTest.java    |  11 +-
 .../cassandra/schema/SchemaKeyspaceTest.java       |  91 ++++++++++-
 .../service/StorageServiceServerTest.java          |  17 +-
 13 files changed, 340 insertions(+), 143 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 5e39823..eb3de5a 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -452,7 +452,7 @@ public class Keyspace
 
     /**
      * If apply is blocking, apply must not be deferred
-     * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+     * Otherwise there is a race condition where ALL mutation workers are being blocked ending
      * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
      *
      * @param mutation       the row to write.  Must not be modified after calling apply, since commitlog append
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7dc6b23..6b0089f 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -21,40 +21,82 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.CFMetaData.DroppedColumn;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.functions.AbstractFunction;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.RowIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static java.lang.String.format;
-
 import static java.util.stream.Collectors.toList;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
@@ -62,7 +104,7 @@ import static org.apache.cassandra.schema.CQLTypeParser.parse;
 
 /**
  * system_schema.* tables and methods for manipulating them.
- */
+*/
 public final class SchemaKeyspace
 {
     private SchemaKeyspace()
@@ -74,49 +116,14 @@ public final class SchemaKeyspace
     private static final boolean FLUSH_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.test.flush_local_schema_changes", "true"));
     private static final boolean IGNORE_CORRUPTED_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.ignore_corrupted_schema_tables", "false"));
 
-    public static final String KEYSPACES = "keyspaces";
-    public static final String TABLES = "tables";
-    public static final String COLUMNS = "columns";
-    public static final String DROPPED_COLUMNS = "dropped_columns";
-    public static final String TRIGGERS = "triggers";
-    public static final String VIEWS = "views";
-    public static final String TYPES = "types";
-    public static final String FUNCTIONS = "functions";
-    public static final String AGGREGATES = "aggregates";
-    public static final String INDEXES = "indexes";
-
-    /**
-     * The order in this list matters.
-     *
-     * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown whilst
-     * the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need to
-     * try to avoid problems like reading a table without columns or types, for example. So columns and types should be
-     * flushed before tables, which should be flushed before keyspaces.
-     *
-     * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply
-     * iterates in reverse order.
-     *
-     * See CASSANDRA-12213 for more details.
-     */
-    public static final ImmutableList<String> ALL =
-        ImmutableList.of(COLUMNS, DROPPED_COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES, INDEXES, TABLES, VIEWS, KEYSPACES);
-
     /**
      * The tables to which we added the cdc column. This is used in {@link #makeUpdateForSchema} below to make sure we skip that
      * column is cdc is disabled as the columns breaks pre-cdc to post-cdc upgrades (typically, 3.0 -> 3.X).
      */
-    private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(TABLES, VIEWS);
-
-
-    /**
-     * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before CASSANDRA-12213)
-     * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
-     */
-    public static final ImmutableList<String> ALL_FOR_DIGEST =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+    private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(SchemaKeyspaceTables.TABLES, SchemaKeyspaceTables.VIEWS);
 
     private static final CFMetaData Keyspaces =
-        compile(KEYSPACES,
+        compile(SchemaKeyspaceTables.KEYSPACES,
                 "keyspace definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -125,7 +132,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name)))");
 
     private static final CFMetaData Tables =
-        compile(TABLES,
+        compile(SchemaKeyspaceTables.TABLES,
                 "table definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -151,7 +158,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name))");
 
     private static final CFMetaData Columns =
-        compile(COLUMNS,
+        compile(SchemaKeyspaceTables.COLUMNS,
                 "column definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -165,7 +172,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 
     private static final CFMetaData DroppedColumns =
-        compile(DROPPED_COLUMNS,
+        compile(SchemaKeyspaceTables.DROPPED_COLUMNS,
                 "dropped column registry",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -177,7 +184,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 
     private static final CFMetaData Triggers =
-        compile(TRIGGERS,
+        compile(SchemaKeyspaceTables.TRIGGERS,
                 "trigger definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -187,7 +194,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 
     private static final CFMetaData Views =
-        compile(VIEWS,
+        compile(SchemaKeyspaceTables.VIEWS,
                 "view definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -216,7 +223,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
     private static final CFMetaData Indexes =
-        compile(INDEXES,
+        compile(SchemaKeyspaceTables.INDEXES,
                 "secondary index definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -227,7 +234,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
 
     private static final CFMetaData Types =
-        compile(TYPES,
+        compile(SchemaKeyspaceTables.TYPES,
                 "user defined type definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -237,7 +244,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), type_name))");
 
     private static final CFMetaData Functions =
-        compile(FUNCTIONS,
+        compile(SchemaKeyspaceTables.FUNCTIONS,
                 "user defined function definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -251,7 +258,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
 
     private static final CFMetaData Aggregates =
-        compile(AGGREGATES,
+        compile(SchemaKeyspaceTables.AGGREGATES,
                 "user defined aggregate definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
@@ -282,7 +289,7 @@ public final class SchemaKeyspace
     /**
      * Add entries to system_schema.* for the hardcoded system keyspaces
      */
-    public static void saveSystemKeyspacesSchema()
+    public static synchronized void saveSystemKeyspacesSchema()
     {
         KeyspaceMetadata system = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
         KeyspaceMetadata schema = Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME);
@@ -290,7 +297,7 @@ public final class SchemaKeyspace
         long timestamp = FBUtilities.timestampMicros();
 
         // delete old, possibly obsolete entries in schema tables
-        for (String schemaTable : ALL)
+        for (String schemaTable : SchemaKeyspaceTables.ALL)
         {
             String query = String.format("DELETE FROM %s.%s USING TIMESTAMP ? WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, schemaTable);
             for (String systemKeyspace : SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES)
@@ -302,15 +309,15 @@ public final class SchemaKeyspace
         makeCreateKeyspaceMutation(schema, timestamp + 1).build().apply();
     }
 
-    public static void truncate()
+    public static synchronized void truncate()
     {
-        ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
+        SchemaKeyspaceTables.ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
     }
 
     static void flush()
     {
         if (!DatabaseDescriptor.isUnsafeSystem())
-            ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
+            SchemaKeyspaceTables.ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
     }
 
     /**
@@ -328,7 +335,7 @@ public final class SchemaKeyspace
     }
 
     @VisibleForTesting
-    static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+    static synchronized Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
     {
         MessageDigest digest;
         MessageDigest digest30;
@@ -342,7 +349,7 @@ public final class SchemaKeyspace
             throw new RuntimeException(e);
         }
 
-        for (String table : ALL_FOR_DIGEST)
+        for (String table : SchemaKeyspaceTables.ALL_FOR_DIGEST)
         {
             ReadCommand cmd = getReadCommandForTableSchema(table);
             try (ReadExecutionController executionController = cmd.executionController();
@@ -387,7 +394,7 @@ public final class SchemaKeyspace
     {
         Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
 
-        for (String table : ALL)
+        for (String table : SchemaKeyspaceTables.ALL)
             convertSchemaToMutations(mutationMap, table);
 
         return mutationMap.values();
@@ -929,7 +936,7 @@ public final class SchemaKeyspace
 
     private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames)
     {
-        String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+        String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
 
         Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
         for (UntypedResultSet.Row row : query(query))
@@ -947,7 +954,7 @@ public final class SchemaKeyspace
          * We know the keyspace names we are going to query, but we still want to run the SELECT IN
          * query, to filter out the keyspaces that had been dropped by the applied mutation set.
          */
-        String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+        String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
 
         Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
         for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames)))
@@ -967,7 +974,7 @@ public final class SchemaKeyspace
 
     private static KeyspaceParams fetchKeyspaceParams(String keyspaceName)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
 
         UntypedResultSet.Row row = query(query, keyspaceName).one();
         boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString());
@@ -977,7 +984,7 @@ public final class SchemaKeyspace
 
     private static Types fetchTypes(String keyspaceName)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TYPES);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES);
 
         Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName);
         for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -992,7 +999,7 @@ public final class SchemaKeyspace
 
     private static Tables fetchTables(String keyspaceName, Types types)
     {
-        String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
+        String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
 
         Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1010,10 +1017,10 @@ public final class SchemaKeyspace
                                                 "\"DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s'; " +
                                                 "DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s';\" " +
                                                 "If the table is not supposed to be dropped, restore %s.%s sstables from backups.",
-                                                keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS,
-                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES, keyspaceName, tableName,
-                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName,
-                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
+                                                keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS,
+                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, keyspaceName, tableName,
+                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS, keyspaceName, tableName,
+                                                SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
 
                 if (IGNORE_CORRUPTED_SCHEMA_TABLES)
                 {
@@ -1031,7 +1038,7 @@ public final class SchemaKeyspace
 
     private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types)
     {
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
         UntypedResultSet rows = query(query, keyspaceName, tableName);
         if (rows.isEmpty())
             throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName));
@@ -1097,7 +1104,7 @@ public final class SchemaKeyspace
 
     private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
         UntypedResultSet columnRows = query(query, keyspace, table);
         if (columnRows.isEmpty())
             throw new MissingColumns("Columns not found in schema table for " + keyspace + "." + table);
@@ -1132,7 +1139,7 @@ public final class SchemaKeyspace
 
     private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, DROPPED_COLUMNS);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.DROPPED_COLUMNS);
         Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
         for (UntypedResultSet.Row row : query(query, keyspace, table))
         {
@@ -1162,7 +1169,7 @@ public final class SchemaKeyspace
 
     private static Indexes fetchIndexes(String keyspace, String table)
     {
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, INDEXES);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.INDEXES);
         Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
         query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row)));
         return indexes.build();
@@ -1178,7 +1185,7 @@ public final class SchemaKeyspace
 
     private static Triggers fetchTriggers(String keyspace, String table)
     {
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TRIGGERS);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TRIGGERS);
         Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
         query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row)));
         return triggers.build();
@@ -1193,7 +1200,7 @@ public final class SchemaKeyspace
 
     private static Views fetchViews(String keyspaceName, Types types)
     {
-        String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS);
+        String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
 
         Views.Builder views = org.apache.cassandra.schema.Views.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1203,7 +1210,7 @@ public final class SchemaKeyspace
 
     private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types)
     {
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
         UntypedResultSet rows = query(query, keyspaceName, viewName);
         if (rows.isEmpty())
             throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName));
@@ -1251,7 +1258,7 @@ public final class SchemaKeyspace
 
     private static Functions fetchUDFs(String keyspaceName, Types types)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, FUNCTIONS);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.FUNCTIONS);
 
         Functions.Builder functions = org.apache.cassandra.schema.Functions.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1312,7 +1319,7 @@ public final class SchemaKeyspace
 
     private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types)
     {
-        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, AGGREGATES);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.AGGREGATES);
 
         Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java
new file mode 100644
index 0000000..a1ae445
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableList;
+
+public class SchemaKeyspaceTables
+{
+    public static final String KEYSPACES = "keyspaces";
+    public static final String TABLES = "tables";
+    public static final String COLUMNS = "columns";
+    public static final String DROPPED_COLUMNS = "dropped_columns";
+    public static final String TRIGGERS = "triggers";
+    public static final String VIEWS = "views";
+    public static final String TYPES = "types";
+    public static final String FUNCTIONS = "functions";
+    public static final String AGGREGATES = "aggregates";
+    public static final String INDEXES = "indexes";
+
+    /**
+     * The order in this list matters.
+     *
+     * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown
+     * whilst the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need
+     * to try to avoid problems like reading a table without columns or types, for example. So columns and types should
+     * be flushed before tables, which should be flushed before keyspaces.
+     *
+     * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply
+     * iterates in reverse order.
+     *
+     * See CASSANDRA-12213 for more details.
+     */
+    public static final ImmutableList<String> ALL = ImmutableList.of(COLUMNS,
+                                                                     DROPPED_COLUMNS,
+                                                                     TRIGGERS,
+                                                                     TYPES,
+                                                                     FUNCTIONS,
+                                                                     AGGREGATES,
+                                                                     INDEXES,
+                                                                     TABLES,
+                                                                     VIEWS,
+                                                                     KEYSPACES);
+
+    /**
+     * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before
+     * CASSANDRA-12213) for digest calculations, otherwise the nodes will never agree on the schema during a rolling
+     * upgrade, see CASSANDRA-13559.
+     */
+    public static final ImmutableList<String> ALL_FOR_DIGEST = ImmutableList.of(KEYSPACES,
+                                                                                TABLES,
+                                                                                COLUMNS,
+                                                                                TRIGGERS,
+                                                                                VIEWS,
+                                                                                TYPES,
+                                                                                FUNCTIONS,
+                                                                                AGGREGATES,
+                                                                                INDEXES);
+
+    private SchemaKeyspaceTables()
+    {
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 2ab1e18..155fd69 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -27,7 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraAuthorizer;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.Resources;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -40,11 +48,11 @@ import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.CassandraVersion;
 
 /**
  * State related to a client connection.
@@ -64,7 +72,7 @@ public class ClientState
         for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS))
             READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf));
 
-        SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
+        SchemaKeyspaceTables.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
 
         // neither clients nor tools need authentication/authorization
         if (DatabaseDescriptor.isDaemonInitialized())
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 1f10d2b..3e17ff0 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -19,23 +19,40 @@ package org.apache.cassandra.utils;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.datastax.driver.core.*;
-
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.Token.TokenFactory;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.schema.CQLTypeParser;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.schema.Types;
 
 public class NativeSSTableLoaderClient extends SSTableLoader.Client
@@ -110,7 +127,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
 
     private static Types fetchTypes(String keyspace, Session session)
     {
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TYPES);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES);
 
         Types.RawBuilder types = Types.rawBuilder(keyspace);
         for (Row row : session.execute(query, keyspace))
@@ -135,7 +152,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
     private static Map<String, CFMetaData> fetchTables(String keyspace, Session session, IPartitioner partitioner, Types types)
     {
         Map<String, CFMetaData> tables = new HashMap<>();
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
 
         for (Row row : session.execute(query, keyspace))
         {
@@ -152,7 +169,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
     private static Map<String, CFMetaData> fetchViews(String keyspace, Session session, IPartitioner partitioner, Types types)
     {
         Map<String, CFMetaData> tables = new HashMap<>();
-        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.VIEWS);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
 
         for (Row row : session.execute(query, keyspace))
         {
@@ -181,7 +198,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
 
         String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
                                             SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                            SchemaKeyspace.COLUMNS);
+                                            SchemaKeyspaceTables.COLUMNS);
 
         List<ColumnDefinition> defs = new ArrayList<>();
         for (Row colRow : session.execute(columnsQuery, keyspace, name))
@@ -200,7 +217,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
 
         String droppedColumnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
                                                    SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                                   SchemaKeyspace.DROPPED_COLUMNS);
+                                                   SchemaKeyspaceTables.DROPPED_COLUMNS);
         Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = new HashMap<>();
         for (Row colRow : session.execute(droppedColumnsQuery, keyspace, name))
         {
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 78b372e..2b0dfc0 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -151,15 +151,15 @@ public class CFMetaDataTest
 
         // Test schema conversion
         Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build();
-        PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES));
-        PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS));
+        PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES));
+        PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS));
 
-        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES),
+        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES),
                                                                  UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()))
                                                       .one();
         TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
 
-        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS),
+        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS),
                                                                 UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds()));
         Set<ColumnDefinition> columns = new HashSet<>();
         for (UntypedResultSet.Row row : columnsRows)
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 753d6ff..dd477ec 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -31,13 +31,15 @@ import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MD5Digest;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 public class PstmtPersistenceTest extends CQLTester
 {
@@ -67,7 +69,7 @@ public class PstmtPersistenceTest extends CQLTester
         String statement1 = "SELECT * FROM %s WHERE pk = ?";
         String statement2 = "SELECT * FROM %s WHERE key = ?";
         String statement3 = "SELECT * FROM %S WHERE key = ?";
-        stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
+        stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, clientState));
         stmtIds.add(prepareStatement(statement1, clientState));
         stmtIds.add(prepareStatement(statement2, "foo", "bar", clientState));
         clientState.setKeyspace("foo");
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 4073a10..ba7eb2b 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
@@ -1692,7 +1693,7 @@ public class ViewTest extends CQLTester
         // Test the where clause stored in system_schema.views
         String schemaQuery = String.format("SELECT where_clause FROM %s.%s WHERE keyspace_name = ? AND view_name = ?",
                                            SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                           SchemaKeyspace.VIEWS);
+                                           SchemaKeyspaceTables.VIEWS);
         assertRows(execute(schemaQuery, keyspace(), viewName), row(expectedSchemaWhereClause));
 
         for (String insert : insertQueries)
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 33cd379..9d1c0dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.cql3.validation.operations;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 
 import static java.lang.String.format;
 import static org.junit.Assert.assertEquals;
@@ -376,7 +376,7 @@ public class AlterTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -385,7 +385,7 @@ public class AlterTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -394,7 +394,7 @@ public class AlterTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -403,7 +403,7 @@ public class AlterTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("enabled", "false")));
@@ -413,7 +413,7 @@ public class AlterTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("enabled", "false")));
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index edb6668..20da909 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.triggers.ITrigger;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -44,8 +44,10 @@ import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
-import static org.apache.cassandra.cql3.Duration.*;
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_HOUR;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MICRO;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MILLI;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MINUTE;
 
 public class CreateTest extends CQLTester
 {
@@ -700,7 +702,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -710,7 +712,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -720,7 +722,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -730,7 +732,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -740,7 +742,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("enabled", "false")));
@@ -750,7 +752,7 @@ public class CreateTest extends CQLTester
 
         assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TABLES),
+                                  SchemaKeyspaceTables.TABLES),
                            KEYSPACE,
                            currentTable()),
                    row(map("enabled", "false")));
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index e86071a..1fb5d2b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 
 import static java.lang.String.format;
 import static org.junit.Assert.assertEquals;
@@ -1367,7 +1368,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
         schemaChange("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} and durable_writes = true ");
         assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.KEYSPACES),
+                                  SchemaKeyspaceTables.KEYSPACES),
                            keyspace),
                    row(true));
 
@@ -1376,14 +1377,14 @@ public class InsertUpdateIfConditionTest extends CQLTester
 
         assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.KEYSPACES),
+                                  SchemaKeyspaceTables.KEYSPACES),
                            keyspace),
                    row(true));
 
         // drop and confirm
         schemaChange("DROP KEYSPACE IF EXISTS " + keyspace);
 
-        assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES),
+        assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES),
                             keyspace));
     }
 
@@ -1461,7 +1462,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
         execute("CREATE TYPE IF NOT EXISTS mytype (somefield int)");
         assertRows(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?",
                                   SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                  SchemaKeyspace.TYPES),
+                                  SchemaKeyspaceTables.TYPES),
                            KEYSPACE,
                            "mytype"),
                    row("mytype"));
@@ -1474,7 +1475,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
         execute("DROP TYPE IF EXISTS mytype");
         assertEmpty(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?",
                                    SchemaConstants.SCHEMA_KEYSPACE_NAME,
-                                   SchemaKeyspace.TYPES),
+                                   SchemaKeyspaceTables.TYPES),
                             KEYSPACE,
                             "mytype"));
     }
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 19f06e5..34590d6 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -23,17 +23,23 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import com.google.common.collect.ImmutableMap;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
@@ -61,12 +67,15 @@ import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(BMUnitRunner.class)
 public class SchemaKeyspaceTest
 {
     private static final String KEYSPACE1 = "CFMetaDataTest1";
@@ -101,8 +110,74 @@ public class SchemaKeyspaceTest
     @Test
     public void testSchemaPullSynchoricity() throws Exception
     {
-        Method method = SchemaKeyspace.class.getDeclaredMethod("convertSchemaToMutations");
+        for (String methodName : Arrays.asList("convertSchemaToMutations",
+                                               "truncate",
+                                               "saveSystemKeyspacesSchema"))
+        {
+            Method method = SchemaKeyspace.class.getDeclaredMethod(methodName);
+            assertTrue(methodName + " is not thread-safe", Modifier.isSynchronized(method.getModifiers()));
+        }
+
+        Method method = SchemaKeyspace.class.getDeclaredMethod("calculateSchemaDigest", Set.class);
         assertTrue(Modifier.isSynchronized(method.getModifiers()));
+        method = SchemaKeyspace.class.getDeclaredMethod("mergeSchemaAndAnnounceVersion", Collection.class);
+        assertTrue(Modifier.isSynchronized(method.getModifiers()));
+        method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Collection.class);
+        assertTrue(Modifier.isSynchronized(method.getModifiers()));
+        method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Keyspaces.class, Keyspaces.class);
+        assertTrue(Modifier.isSynchronized(method.getModifiers()));        
+    }
+
+    /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to prevent concurrent schema pull/writes */
+    @Test
+    @BMRule(name = "delay partition updates to schema tables",
+            targetClass = "ColumnFamilyStore",
+            targetMethod = "apply",
+            action = "Thread.sleep(5000);",
+            targetLocation = "AT EXIT")
+    public void testNoVisiblePartialSchemaUpdates() throws Exception
+    {
+        String keyspace = "sandbox";
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+
+        SchemaKeyspace.truncate(); // Make sure there's nothing but the create we're about to do
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Future<Void> creation = pool.submit(() -> {
+            barrier.await();
+            createTable(keyspace, "CREATE TABLE test (a text primary key, b int, c int)");
+            return null;
+        });
+
+        Future<Collection<Mutation>> mutationsFromThread = pool.submit(() -> {
+            barrier.await();
+
+            // Make sure we actually have a mutation to check for partial modification.
+            Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+            while (mutations.size() == 0)
+                mutations = SchemaKeyspace.convertSchemaToMutations();
+
+            return mutations;
+        });
+
+        creation.get(); // make sure the creation is finished
+
+        Collection<Mutation> mutationsFromConcurrentAccess = mutationsFromThread.get();
+        Collection<Mutation> settledMutations = SchemaKeyspace.convertSchemaToMutations();
+
+        // If the worker thread picked up the creation at all, it should have the same modifications.
+        // In other words, we should see all modifications or none.
+        if (mutationsFromConcurrentAccess.size() == settledMutations.size())
+        {
+            assertEquals(1, settledMutations.size());
+            Mutation mutationFromConcurrentAccess = mutationsFromConcurrentAccess.iterator().next();
+            Mutation settledMutation = settledMutations.iterator().next();
+
+            assertEquals("Read partial schema change!",
+                         settledMutation.getColumnFamilyIds(), mutationFromConcurrentAccess.getColumnFamilyIds());
+        }
+
+        pool.shutdownNow();
     }
 
     @Test
@@ -211,15 +286,15 @@ public class SchemaKeyspaceTest
 
         // Test schema conversion
         Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build();
-        PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES));
-        PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS));
+        PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES));
+        PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS));
 
-        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES),
+        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES),
                                                                  UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()))
                                                       .one();
         TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
 
-        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS),
+        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS),
                                                                 UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
         Set<ColumnDefinition> columns = new HashSet<>();
         for (UntypedResultSet.Row row : columnsRows)
@@ -246,7 +321,7 @@ public class SchemaKeyspaceTest
     {
         for (PartitionUpdate p : m.getPartitionUpdates())
         {
-            if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+            if (p.metadata().cfName.equals(SchemaKeyspaceTables.TABLES))
                 return true;
         }
         return false;
@@ -332,7 +407,7 @@ public class SchemaKeyspaceTest
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, testTable));
         // Delete partition column in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
         executeOnceInternal(query, testKS, testTable, "key");
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
@@ -346,7 +421,7 @@ public class SchemaKeyspaceTest
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, testTable));
         // Delete all colmns in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
         executeOnceInternal(query, testKS, testTable);
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 297d19d..d5ab214 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -24,10 +24,17 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,9 +42,8 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -49,9 +55,10 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.PropertyFileSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.ReplicationParams;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -175,7 +182,7 @@ public class StorageServiceServerTest
     public void testTableSnapshot() throws IOException
     {
         // no need to insert extra data, even an "empty" database will have a little information in the system keyspace
-        StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES, UUID.randomUUID().toString());
+        StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES, UUID.randomUUID().toString());
     }
 
     @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org