You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/26 05:47:31 UTC
[cassandra] branch cassandra-3.11 updated: Prevent broken
concurrent schema read/writes
This is an automated email from the ASF dual-hosted git repository.
bereng pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new fa532a6 Prevent broken concurrent schema read/writes
fa532a6 is described below
commit fa532a61f810b428ccfdf4964684794a7fc0e885
Author: Bereng <be...@gmail.com>
AuthorDate: Wed Oct 20 10:44:50 2021 +0200
Prevent broken concurrent schema read/writes
patch by Berenguer Blasi; reviewed by Caleb Rackliffe for CASSANDRA-16996
Co-authored-by: Berenguer Blasi <be...@gmail.com>
Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 175 +++++++++++----------
.../cassandra/schema/SchemaKeyspaceTables.java | 77 +++++++++
.../org/apache/cassandra/service/ClientState.java | 16 +-
.../cassandra/utils/NativeSSTableLoaderClient.java | 41 +++--
.../apache/cassandra/config/CFMetaDataTest.java | 8 +-
.../cassandra/cql3/PstmtPersistenceTest.java | 8 +-
test/unit/org/apache/cassandra/cql3/ViewTest.java | 3 +-
.../cql3/validation/operations/AlterTest.java | 14 +-
.../cql3/validation/operations/CreateTest.java | 20 +--
.../operations/InsertUpdateIfConditionTest.java | 11 +-
.../cassandra/schema/SchemaKeyspaceTest.java | 91 ++++++++++-
.../service/StorageServiceServerTest.java | 17 +-
13 files changed, 340 insertions(+), 143 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 5e39823..eb3de5a 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -452,7 +452,7 @@ public class Keyspace
/**
* If apply is blocking, apply must not be deferred
- * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+ * Otherwise there is a race condition where ALL mutation workers are being blocked ending
* in a complete deadlock of the mutation stage. See CASSANDRA-12689.
*
* @param mutation the row to write. Must not be modified after calling apply, since commitlog append
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7dc6b23..6b0089f 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -21,40 +21,82 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.CFMetaData.DroppedColumn;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.functions.AbstractFunction;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.RowIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.service.PendingRangeCalculatorService;
-import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static java.lang.String.format;
-
import static java.util.stream.Collectors.toList;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
@@ -62,7 +104,7 @@ import static org.apache.cassandra.schema.CQLTypeParser.parse;
/**
* system_schema.* tables and methods for manipulating them.
- */
+*/
public final class SchemaKeyspace
{
private SchemaKeyspace()
@@ -74,49 +116,14 @@ public final class SchemaKeyspace
private static final boolean FLUSH_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.test.flush_local_schema_changes", "true"));
private static final boolean IGNORE_CORRUPTED_SCHEMA_TABLES = Boolean.parseBoolean(System.getProperty("cassandra.ignore_corrupted_schema_tables", "false"));
- public static final String KEYSPACES = "keyspaces";
- public static final String TABLES = "tables";
- public static final String COLUMNS = "columns";
- public static final String DROPPED_COLUMNS = "dropped_columns";
- public static final String TRIGGERS = "triggers";
- public static final String VIEWS = "views";
- public static final String TYPES = "types";
- public static final String FUNCTIONS = "functions";
- public static final String AGGREGATES = "aggregates";
- public static final String INDEXES = "indexes";
-
- /**
- * The order in this list matters.
- *
- * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown whilst
- * the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need to
- * try to avoid problems like reading a table without columns or types, for example. So columns and types should be
- * flushed before tables, which should be flushed before keyspaces.
- *
- * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply
- * iterates in reverse order.
- *
- * See CASSANDRA-12213 for more details.
- */
- public static final ImmutableList<String> ALL =
- ImmutableList.of(COLUMNS, DROPPED_COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES, INDEXES, TABLES, VIEWS, KEYSPACES);
-
/**
* The tables to which we added the cdc column. This is used in {@link #makeUpdateForSchema} below to make sure we skip that
* column is cdc is disabled as the columns breaks pre-cdc to post-cdc upgrades (typically, 3.0 -> 3.X).
*/
- private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(TABLES, VIEWS);
-
-
- /**
- * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before CASSANDRA-12213)
- * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
- */
- public static final ImmutableList<String> ALL_FOR_DIGEST =
- ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+ private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(SchemaKeyspaceTables.TABLES, SchemaKeyspaceTables.VIEWS);
private static final CFMetaData Keyspaces =
- compile(KEYSPACES,
+ compile(SchemaKeyspaceTables.KEYSPACES,
"keyspace definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -125,7 +132,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name)))");
private static final CFMetaData Tables =
- compile(TABLES,
+ compile(SchemaKeyspaceTables.TABLES,
"table definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -151,7 +158,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name))");
private static final CFMetaData Columns =
- compile(COLUMNS,
+ compile(SchemaKeyspaceTables.COLUMNS,
"column definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -165,7 +172,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, column_name))");
private static final CFMetaData DroppedColumns =
- compile(DROPPED_COLUMNS,
+ compile(SchemaKeyspaceTables.DROPPED_COLUMNS,
"dropped column registry",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -177,7 +184,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, column_name))");
private static final CFMetaData Triggers =
- compile(TRIGGERS,
+ compile(SchemaKeyspaceTables.TRIGGERS,
"trigger definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -187,7 +194,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
private static final CFMetaData Views =
- compile(VIEWS,
+ compile(SchemaKeyspaceTables.VIEWS,
"view definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -216,7 +223,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), view_name))");
private static final CFMetaData Indexes =
- compile(INDEXES,
+ compile(SchemaKeyspaceTables.INDEXES,
"secondary index definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -227,7 +234,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, index_name))");
private static final CFMetaData Types =
- compile(TYPES,
+ compile(SchemaKeyspaceTables.TYPES,
"user defined type definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -237,7 +244,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), type_name))");
private static final CFMetaData Functions =
- compile(FUNCTIONS,
+ compile(SchemaKeyspaceTables.FUNCTIONS,
"user defined function definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -251,7 +258,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
private static final CFMetaData Aggregates =
- compile(AGGREGATES,
+ compile(SchemaKeyspaceTables.AGGREGATES,
"user defined aggregate definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
@@ -282,7 +289,7 @@ public final class SchemaKeyspace
/**
* Add entries to system_schema.* for the hardcoded system keyspaces
*/
- public static void saveSystemKeyspacesSchema()
+ public static synchronized void saveSystemKeyspacesSchema()
{
KeyspaceMetadata system = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
KeyspaceMetadata schema = Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME);
@@ -290,7 +297,7 @@ public final class SchemaKeyspace
long timestamp = FBUtilities.timestampMicros();
// delete old, possibly obsolete entries in schema tables
- for (String schemaTable : ALL)
+ for (String schemaTable : SchemaKeyspaceTables.ALL)
{
String query = String.format("DELETE FROM %s.%s USING TIMESTAMP ? WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, schemaTable);
for (String systemKeyspace : SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES)
@@ -302,15 +309,15 @@ public final class SchemaKeyspace
makeCreateKeyspaceMutation(schema, timestamp + 1).build().apply();
}
- public static void truncate()
+ public static synchronized void truncate()
{
- ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
+ SchemaKeyspaceTables.ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
}
static void flush()
{
if (!DatabaseDescriptor.isUnsafeSystem())
- ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
+ SchemaKeyspaceTables.ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
}
/**
@@ -328,7 +335,7 @@ public final class SchemaKeyspace
}
@VisibleForTesting
- static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+ static synchronized Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
{
MessageDigest digest;
MessageDigest digest30;
@@ -342,7 +349,7 @@ public final class SchemaKeyspace
throw new RuntimeException(e);
}
- for (String table : ALL_FOR_DIGEST)
+ for (String table : SchemaKeyspaceTables.ALL_FOR_DIGEST)
{
ReadCommand cmd = getReadCommandForTableSchema(table);
try (ReadExecutionController executionController = cmd.executionController();
@@ -387,7 +394,7 @@ public final class SchemaKeyspace
{
Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
- for (String table : ALL)
+ for (String table : SchemaKeyspaceTables.ALL)
convertSchemaToMutations(mutationMap, table);
return mutationMap.values();
@@ -929,7 +936,7 @@ public final class SchemaKeyspace
private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames)
{
- String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+ String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
for (UntypedResultSet.Row row : query(query))
@@ -947,7 +954,7 @@ public final class SchemaKeyspace
* We know the keyspace names we are going to query, but we still want to run the SELECT IN
* query, to filter out the keyspaces that had been dropped by the applied mutation set.
*/
- String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+ String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames)))
@@ -967,7 +974,7 @@ public final class SchemaKeyspace
private static KeyspaceParams fetchKeyspaceParams(String keyspaceName)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES);
UntypedResultSet.Row row = query(query, keyspaceName).one();
boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString());
@@ -977,7 +984,7 @@ public final class SchemaKeyspace
private static Types fetchTypes(String keyspaceName)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TYPES);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES);
Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName);
for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -992,7 +999,7 @@ public final class SchemaKeyspace
private static Tables fetchTables(String keyspaceName, Types types)
{
- String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
+ String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1010,10 +1017,10 @@ public final class SchemaKeyspace
"\"DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s'; " +
"DELETE FROM %s.%s WHERE keyspace_name = '%s' AND table_name = '%s';\" " +
"If the table is not supposed to be dropped, restore %s.%s sstables from backups.",
- keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS,
- SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES, keyspaceName, tableName,
- SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS, keyspaceName, tableName,
- SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
+ keyspaceName, tableName, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS,
+ SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, keyspaceName, tableName,
+ SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS, keyspaceName, tableName,
+ SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
if (IGNORE_CORRUPTED_SCHEMA_TABLES)
{
@@ -1031,7 +1038,7 @@ public final class SchemaKeyspace
private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types)
{
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
UntypedResultSet rows = query(query, keyspaceName, tableName);
if (rows.isEmpty())
throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName));
@@ -1097,7 +1104,7 @@ public final class SchemaKeyspace
private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
UntypedResultSet columnRows = query(query, keyspace, table);
if (columnRows.isEmpty())
throw new MissingColumns("Columns not found in schema table for " + keyspace + "." + table);
@@ -1132,7 +1139,7 @@ public final class SchemaKeyspace
private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, DROPPED_COLUMNS);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.DROPPED_COLUMNS);
Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
for (UntypedResultSet.Row row : query(query, keyspace, table))
{
@@ -1162,7 +1169,7 @@ public final class SchemaKeyspace
private static Indexes fetchIndexes(String keyspace, String table)
{
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, INDEXES);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.INDEXES);
Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row)));
return indexes.build();
@@ -1178,7 +1185,7 @@ public final class SchemaKeyspace
private static Triggers fetchTriggers(String keyspace, String table)
{
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TRIGGERS);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TRIGGERS);
Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row)));
return triggers.build();
@@ -1193,7 +1200,7 @@ public final class SchemaKeyspace
private static Views fetchViews(String keyspaceName, Types types)
{
- String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS);
+ String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
Views.Builder views = org.apache.cassandra.schema.Views.builder();
for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1203,7 +1210,7 @@ public final class SchemaKeyspace
private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types)
{
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
UntypedResultSet rows = query(query, keyspaceName, viewName);
if (rows.isEmpty())
throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName));
@@ -1251,7 +1258,7 @@ public final class SchemaKeyspace
private static Functions fetchUDFs(String keyspaceName, Types types)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, FUNCTIONS);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.FUNCTIONS);
Functions.Builder functions = org.apache.cassandra.schema.Functions.builder();
for (UntypedResultSet.Row row : query(query, keyspaceName))
@@ -1312,7 +1319,7 @@ public final class SchemaKeyspace
private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types)
{
- String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, AGGREGATES);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.AGGREGATES);
Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder();
for (UntypedResultSet.Row row : query(query, keyspaceName))
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java
new file mode 100644
index 0000000..a1ae445
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspaceTables.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableList;
+
+public class SchemaKeyspaceTables
+{
+ public static final String KEYSPACES = "keyspaces";
+ public static final String TABLES = "tables";
+ public static final String COLUMNS = "columns";
+ public static final String DROPPED_COLUMNS = "dropped_columns";
+ public static final String TRIGGERS = "triggers";
+ public static final String VIEWS = "views";
+ public static final String TYPES = "types";
+ public static final String FUNCTIONS = "functions";
+ public static final String AGGREGATES = "aggregates";
+ public static final String INDEXES = "indexes";
+
+ /**
+ * The order in this list matters.
+ *
+ * When flushing schema tables, we want to flush them in a way that mitigates the effects of an abrupt shutdown
+ * whilst the tables are being flushed. On startup, we load the schema from disk before replaying the CL, so we need
+ * to try to avoid problems like reading a table without columns or types, for example. So columns and types should
+ * be flushed before tables, which should be flushed before keyspaces.
+ *
+ * When truncating, the order should be reversed. For immutable lists this is an efficient operation that simply
+ * iterates in reverse order.
+ *
+ * See CASSANDRA-12213 for more details.
+ */
+ public static final ImmutableList<String> ALL = ImmutableList.of(COLUMNS,
+ DROPPED_COLUMNS,
+ TRIGGERS,
+ TYPES,
+ FUNCTIONS,
+ AGGREGATES,
+ INDEXES,
+ TABLES,
+ VIEWS,
+ KEYSPACES);
+
+ /**
+ * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before
+ * CASSANDRA-12213) for digest calculations, otherwise the nodes will never agree on the schema during a rolling
+ * upgrade, see CASSANDRA-13559.
+ */
+ public static final ImmutableList<String> ALL_FOR_DIGEST = ImmutableList.of(KEYSPACES,
+ TABLES,
+ COLUMNS,
+ TRIGGERS,
+ VIEWS,
+ TYPES,
+ FUNCTIONS,
+ AGGREGATES,
+ INDEXES);
+
+ private SchemaKeyspaceTables()
+ {
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 2ab1e18..155fd69 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -27,7 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraAuthorizer;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.Resources;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@@ -40,11 +48,11 @@ import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.CassandraVersion;
/**
* State related to a client connection.
@@ -64,7 +72,7 @@ public class ClientState
for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS))
READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf));
- SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
+ SchemaKeyspaceTables.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
// neither clients nor tools need authentication/authorization
if (DatabaseDescriptor.isDaemonInitialized())
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 1f10d2b..3e17ff0 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -19,23 +19,40 @@ package org.apache.cassandra.utils;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.datastax.driver.core.*;
-
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.schema.CQLTypeParser;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.schema.Types;
public class NativeSSTableLoaderClient extends SSTableLoader.Client
@@ -110,7 +127,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
private static Types fetchTypes(String keyspace, Session session)
{
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TYPES);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TYPES);
Types.RawBuilder types = Types.rawBuilder(keyspace);
for (Row row : session.execute(query, keyspace))
@@ -135,7 +152,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
private static Map<String, CFMetaData> fetchTables(String keyspace, Session session, IPartitioner partitioner, Types types)
{
Map<String, CFMetaData> tables = new HashMap<>();
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES);
for (Row row : session.execute(query, keyspace))
{
@@ -152,7 +169,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
private static Map<String, CFMetaData> fetchViews(String keyspace, Session session, IPartitioner partitioner, Types types)
{
Map<String, CFMetaData> tables = new HashMap<>();
- String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.VIEWS);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.VIEWS);
for (Row row : session.execute(query, keyspace))
{
@@ -181,7 +198,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.COLUMNS);
+ SchemaKeyspaceTables.COLUMNS);
List<ColumnDefinition> defs = new ArrayList<>();
for (Row colRow : session.execute(columnsQuery, keyspace, name))
@@ -200,7 +217,7 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
String droppedColumnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.DROPPED_COLUMNS);
+ SchemaKeyspaceTables.DROPPED_COLUMNS);
Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = new HashMap<>();
for (Row colRow : session.execute(droppedColumnsQuery, keyspace, name))
{
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 78b372e..2b0dfc0 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -151,15 +151,15 @@ public class CFMetaDataTest
// Test schema conversion
Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build();
- PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES));
- PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS));
+ PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES));
+ PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS));
- UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES),
+ UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES),
UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()))
.one();
TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
- UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS),
+ UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS),
UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds()));
Set<ColumnDefinition> columns = new HashSet<>();
for (UntypedResultSet.Row row : columnsRows)
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 753d6ff..dd477ec 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -31,13 +31,15 @@ import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MD5Digest;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
public class PstmtPersistenceTest extends CQLTester
{
@@ -67,7 +69,7 @@ public class PstmtPersistenceTest extends CQLTester
String statement1 = "SELECT * FROM %s WHERE pk = ?";
String statement2 = "SELECT * FROM %s WHERE key = ?";
String statement3 = "SELECT * FROM %S WHERE key = ?";
- stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
+ stmtIds.add(prepareStatement(statement0, SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES, clientState));
stmtIds.add(prepareStatement(statement1, clientState));
stmtIds.add(prepareStatement(statement2, "foo", "bar", clientState));
clientState.setKeyspace("foo");
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 4073a10..ba7eb2b 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.FBUtilities;
@@ -1692,7 +1693,7 @@ public class ViewTest extends CQLTester
// Test the where clause stored in system_schema.views
String schemaQuery = String.format("SELECT where_clause FROM %s.%s WHERE keyspace_name = ? AND view_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.VIEWS);
+ SchemaKeyspaceTables.VIEWS);
assertRows(execute(schemaQuery, keyspace(), viewName), row(expectedSchemaWhereClause));
for (String insert : insertQueries)
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 33cd379..9d1c0dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.cql3.validation.operations;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
@@ -376,7 +376,7 @@ public class AlterTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -385,7 +385,7 @@ public class AlterTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -394,7 +394,7 @@ public class AlterTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -403,7 +403,7 @@ public class AlterTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("enabled", "false")));
@@ -413,7 +413,7 @@ public class AlterTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("enabled", "false")));
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index edb6668..20da909 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.triggers.ITrigger;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,8 +44,10 @@ import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
-import static org.apache.cassandra.cql3.Duration.*;
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_HOUR;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MICRO;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MILLI;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MINUTE;
public class CreateTest extends CQLTester
{
@@ -700,7 +702,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
@@ -710,7 +712,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -720,7 +722,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -730,7 +732,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
@@ -740,7 +742,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("enabled", "false")));
@@ -750,7 +752,7 @@ public class CreateTest extends CQLTester
assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TABLES),
+ SchemaKeyspaceTables.TABLES),
KEYSPACE,
currentTable()),
row(map("enabled", "false")));
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index e86071a..1fb5d2b 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
@@ -1367,7 +1368,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
schemaChange("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} and durable_writes = true ");
assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.KEYSPACES),
+ SchemaKeyspaceTables.KEYSPACES),
keyspace),
row(true));
@@ -1376,14 +1377,14 @@ public class InsertUpdateIfConditionTest extends CQLTester
assertRows(execute(format("select durable_writes from %s.%s where keyspace_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.KEYSPACES),
+ SchemaKeyspaceTables.KEYSPACES),
keyspace),
row(true));
// drop and confirm
schemaChange("DROP KEYSPACE IF EXISTS " + keyspace);
- assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES),
+ assertEmpty(execute(format("select * from %s.%s where keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES),
keyspace));
}
@@ -1461,7 +1462,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
execute("CREATE TYPE IF NOT EXISTS mytype (somefield int)");
assertRows(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TYPES),
+ SchemaKeyspaceTables.TYPES),
KEYSPACE,
"mytype"),
row("mytype"));
@@ -1474,7 +1475,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
execute("DROP TYPE IF EXISTS mytype");
assertEmpty(execute(format("SELECT type_name from %s.%s where keyspace_name = ? and type_name = ?",
SchemaConstants.SCHEMA_KEYSPACE_NAME,
- SchemaKeyspace.TYPES),
+ SchemaKeyspaceTables.TYPES),
KEYSPACE,
"mytype"));
}
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 19f06e5..34590d6 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -23,17 +23,23 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
@@ -61,12 +67,15 @@ import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(BMUnitRunner.class)
public class SchemaKeyspaceTest
{
private static final String KEYSPACE1 = "CFMetaDataTest1";
@@ -101,8 +110,74 @@ public class SchemaKeyspaceTest
@Test
public void testSchemaPullSynchoricity() throws Exception
{
- Method method = SchemaKeyspace.class.getDeclaredMethod("convertSchemaToMutations");
+ for (String methodName : Arrays.asList("convertSchemaToMutations",
+ "truncate",
+ "saveSystemKeyspacesSchema"))
+ {
+ Method method = SchemaKeyspace.class.getDeclaredMethod(methodName);
+ assertTrue(methodName + " is not thread-safe", Modifier.isSynchronized(method.getModifiers()));
+ }
+
+ Method method = SchemaKeyspace.class.getDeclaredMethod("calculateSchemaDigest", Set.class);
assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ method = SchemaKeyspace.class.getDeclaredMethod("mergeSchemaAndAnnounceVersion", Collection.class);
+ assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Collection.class);
+ assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ method = SchemaKeyspace.class.getDeclaredMethod("mergeSchema", Keyspaces.class, Keyspaces.class);
+ assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ }
+
+ /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to prevent concurrent schema pull/writes */
+ @Test
+ @BMRule(name = "delay partition updates to schema tables",
+ targetClass = "ColumnFamilyStore",
+ targetMethod = "apply",
+ action = "Thread.sleep(5000);",
+ targetLocation = "AT EXIT")
+ public void testNoVisiblePartialSchemaUpdates() throws Exception
+ {
+ String keyspace = "sandbox";
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+
+ SchemaKeyspace.truncate(); // Make sure there's nothing but the create we're about to do
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Future<Void> creation = pool.submit(() -> {
+ barrier.await();
+ createTable(keyspace, "CREATE TABLE test (a text primary key, b int, c int)");
+ return null;
+ });
+
+ Future<Collection<Mutation>> mutationsFromThread = pool.submit(() -> {
+ barrier.await();
+
+ // Make sure we actually have a mutation to check for partial modification.
+ Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+ while (mutations.size() == 0)
+ mutations = SchemaKeyspace.convertSchemaToMutations();
+
+ return mutations;
+ });
+
+ creation.get(); // make sure the creation is finished
+
+ Collection<Mutation> mutationsFromConcurrentAccess = mutationsFromThread.get();
+ Collection<Mutation> settledMutations = SchemaKeyspace.convertSchemaToMutations();
+
+ // If the worker thread picked up the creation at all, it should have the same modifications.
+ // In other words, we should see all modifications or none.
+ if (mutationsFromConcurrentAccess.size() == settledMutations.size())
+ {
+ assertEquals(1, settledMutations.size());
+ Mutation mutationFromConcurrentAccess = mutationsFromConcurrentAccess.iterator().next();
+ Mutation settledMutation = settledMutations.iterator().next();
+
+ assertEquals("Read partial schema change!",
+ settledMutation.getColumnFamilyIds(), mutationFromConcurrentAccess.getColumnFamilyIds());
+ }
+
+ pool.shutdownNow();
}
@Test
@@ -211,15 +286,15 @@ public class SchemaKeyspaceTest
// Test schema conversion
Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros()).build();
- PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES));
- PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS));
+ PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES));
+ PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS));
- UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES),
+ UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.TABLES),
UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()))
.one();
TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
- UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS),
+ UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS),
UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
Set<ColumnDefinition> columns = new HashSet<>();
for (UntypedResultSet.Row row : columnsRows)
@@ -246,7 +321,7 @@ public class SchemaKeyspaceTest
{
for (PartitionUpdate p : m.getPartitionUpdates())
{
- if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+ if (p.metadata().cfName.equals(SchemaKeyspaceTables.TABLES))
return true;
}
return false;
@@ -332,7 +407,7 @@ public class SchemaKeyspaceTest
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS, testTable));
// Delete partition column in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=? and column_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
executeOnceInternal(query, testKS, testTable, "key");
SchemaKeyspace.fetchNonSystemKeyspaces();
}
@@ -346,7 +421,7 @@ public class SchemaKeyspaceTest
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS, testTable));
// Delete all colmns in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.COLUMNS);
executeOnceInternal(query, testKS, testTable);
SchemaKeyspace.fetchNonSystemKeyspaces();
}
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 297d19d..d5ab214 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -24,10 +24,17 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,9 +42,8 @@ import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
-import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -49,9 +55,10 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PropertyFileSnitch;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.ReplicationParams;
-import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.SchemaKeyspaceTables;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
@@ -175,7 +182,7 @@ public class StorageServiceServerTest
public void testTableSnapshot() throws IOException
{
// no need to insert extra data, even an "empty" database will have a little information in the system keyspace
- StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.KEYSPACES, UUID.randomUUID().toString());
+ StorageService.instance.takeTableSnapshot(SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspaceTables.KEYSPACES, UUID.randomUUID().toString());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org