You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:13 UTC
[4/5] cassandra git commit: Isolate schema serializaton code
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 8b42e51..4672451 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -30,11 +30,7 @@ import com.datastax.driver.core.UserType;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -47,9 +43,10 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class);
protected final List<ColumnIdentifier> argNames;
+
protected final String language;
protected final String body;
- private final boolean deterministic;
+ protected final boolean isDeterministic;
protected final DataType[] argDataTypes;
protected final DataType returnDataType;
@@ -60,10 +57,10 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
AbstractType<?> returnType,
String language,
String body,
- boolean deterministic)
+ boolean isDeterministic)
{
this(name, argNames, argTypes, UDHelper.driverTypes(argTypes), returnType,
- UDHelper.driverType(returnType), language, body, deterministic);
+ UDHelper.driverType(returnType), language, body, isDeterministic);
}
protected UDFunction(FunctionName name,
@@ -74,14 +71,14 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
DataType returnDataType,
String language,
String body,
- boolean deterministic)
+ boolean isDeterministic)
{
super(name, argTypes, returnType);
assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names";
this.argNames = argNames;
this.language = language;
this.body = body;
- this.deterministic = deterministic;
+ this.isDeterministic = isDeterministic;
this.argDataTypes = argDataTypes;
this.returnDataType = returnDataType;
}
@@ -92,13 +89,13 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
AbstractType<?> returnType,
String language,
String body,
- boolean deterministic)
+ boolean isDeterministic)
throws InvalidRequestException
{
switch (language)
{
- case "java": return JavaSourceUDFFactory.buildUDF(name, argNames, argTypes, returnType, body, deterministic);
- default: return new ScriptBasedUDF(name, argNames, argTypes, returnType, language, body, deterministic);
+ case "java": return JavaSourceUDFFactory.buildUDF(name, argNames, argTypes, returnType, body, isDeterministic);
+ default: return new ScriptBasedUDF(name, argNames, argTypes, returnType, language, body, isDeterministic);
}
}
@@ -111,24 +108,27 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
* 2) we return a meaningful error message if the function is executed (something more precise
* than saying that the function doesn't exist)
*/
- private static UDFunction createBrokenFunction(FunctionName name,
- List<ColumnIdentifier> argNames,
- List<AbstractType<?>> argTypes,
- AbstractType<?> returnType,
- String language,
- String body,
- final InvalidRequestException reason)
+ public static UDFunction createBrokenFunction(FunctionName name,
+ List<ColumnIdentifier> argNames,
+ List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType,
+ String language,
+ String body,
+ final InvalidRequestException reason)
{
return new UDFunction(name, argNames, argTypes, returnType, language, body, true)
{
public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException
{
- throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. "
- + "Please see the server log for more details", this, reason.getMessage()));
+ throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully "
+ + "for the following reason: %s. Please see the server log for details",
+ this,
+ reason.getMessage()));
}
};
}
+
public boolean isAggregate()
{
return false;
@@ -136,7 +136,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
public boolean isPure()
{
- return deterministic;
+ return isDeterministic;
}
public boolean isNative()
@@ -144,13 +144,33 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
return false;
}
+ public List<ColumnIdentifier> argNames()
+ {
+ return argNames;
+ }
+
+ public boolean isDeterministic()
+ {
+ return isDeterministic;
+ }
+
+ public String body()
+ {
+ return body;
+ }
+
+ public String language()
+ {
+ return language;
+ }
+
/**
* Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory}
* and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C*
* serialized representation to the Java object representation.
*
* @param protocolVersion the native protocol version used for serialization
- * @param argIndex index of the UDF input argument
+ * @param argIndex index of the UDF input argument
*/
protected Object compose(int protocolVersion, int argIndex, ByteBuffer value)
{
@@ -169,117 +189,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
}
- private static Mutation makeSchemaMutation(FunctionName name)
- {
- UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaFunctionsTable.getKeyValidator();
- return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace));
- }
-
- public Mutation toSchemaDrop(long timestamp)
- {
- Mutation mutation = makeSchemaMutation(name);
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
-
- Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
- int ldt = (int) (System.currentTimeMillis() / 1000);
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
- }
-
- public static Map<Composite, UDFunction> fromSchema(Row row)
- {
- UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, row);
- Map<Composite, UDFunction> udfs = new HashMap<>(results.size());
- for (UntypedResultSet.Row result : results)
- udfs.put(SystemKeyspace.SchemaFunctionsTable.comparator.make(result.getString("function_name"), result.getBlob("signature")),
- fromSchema(result));
- return udfs;
- }
-
- public Mutation toSchemaUpdate(long timestamp)
- {
- Mutation mutation = makeSchemaMutation(name);
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE);
-
- Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.resetCollection("argument_names");
- adder.resetCollection("argument_types");
- adder.add("return_type", returnType.toString());
- adder.add("language", language);
- adder.add("body", body);
- adder.add("deterministic", deterministic);
-
- for (int i = 0; i < argNames.size(); i++)
- {
- adder.addListEntry("argument_names", argNames.get(i).bytes);
- adder.addListEntry("argument_types", argTypes.get(i).toString());
- }
-
- return mutation;
- }
-
- public static UDFunction fromSchema(UntypedResultSet.Row row)
- {
- String ksName = row.getString("keyspace_name");
- String functionName = row.getString("function_name");
- FunctionName name = new FunctionName(ksName, functionName);
-
- List<String> names = row.getList("argument_names", UTF8Type.instance);
- List<String> types = row.getList("argument_types", UTF8Type.instance);
-
- List<ColumnIdentifier> argNames;
- if (names == null)
- argNames = Collections.emptyList();
- else
- {
- argNames = new ArrayList<>(names.size());
- for (String arg : names)
- argNames.add(new ColumnIdentifier(arg, true));
- }
-
- List<AbstractType<?>> argTypes;
- if (types == null)
- argTypes = Collections.emptyList();
- else
- {
- argTypes = new ArrayList<>(types.size());
- for (String type : types)
- argTypes.add(parseType(type));
- }
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
-
- boolean deterministic = row.getBoolean("deterministic");
- String language = row.getString("language");
- String body = row.getString("body");
-
- try
- {
- return create(name, argNames, argTypes, returnType, language, body, deterministic);
- }
- catch (InvalidRequestException e)
- {
- logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e);
- return createBrokenFunction(name, argNames, argTypes, returnType, language, body, e);
- }
- }
-
- private static AbstractType<?> parseType(String str)
- {
- // We only use this when reading the schema where we shouldn't get an error
- try
- {
- return TypeParser.parse(str);
- }
- catch (SyntaxException | ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
@Override
public boolean equals(Object o)
{
@@ -287,19 +196,19 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
return false;
UDFunction that = (UDFunction)o;
- return Objects.equal(this.name, that.name)
- && Functions.typeEquals(this.argTypes, that.argTypes)
- && Functions.typeEquals(this.returnType, that.returnType)
- && Objects.equal(this.argNames, that.argNames)
- && Objects.equal(this.language, that.language)
- && Objects.equal(this.body, that.body)
- && Objects.equal(this.deterministic, that.deterministic);
+ return Objects.equal(name, that.name)
+ && Objects.equal(argNames, that.argNames)
+ && Functions.typeEquals(argTypes, that.argTypes)
+ && Functions.typeEquals(returnType, that.returnType)
+ && Objects.equal(language, that.language)
+ && Objects.equal(body, that.body)
+ && Objects.equal(isDeterministic, that.isDeterministic);
}
@Override
public int hashCode()
{
- return Objects.hashCode(name, argTypes, returnType, argNames, language, body, deterministic);
+ return Objects.hashCode(name, argNames, argTypes, returnType, language, body, isDeterministic);
}
public void userTypeUpdated(String ksName, String typeName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
index 2a17c75..0738cbe 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -31,12 +31,13 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.DataType;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.utils.FBUtilities;
/**
* Helper class for User Defined Functions + Aggregates.
*/
-final class UDHelper
+public final class UDHelper
{
protected static final Logger logger = LoggerFactory.getLogger(UDHelper.class);
@@ -112,12 +113,13 @@ final class UDHelper
// we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by
// using a "signature" UDT that would be comprised of the function name and argument types,
// which we could then use as clustering column. But as we haven't yet used UDT in system tables,
- // We'll left that decision to #6717).
- protected static ByteBuffer computeSignature(List<AbstractType<?>> argTypes)
+ // We'll leave that decision to #6717).
+ public static ByteBuffer calculateSignature(AbstractFunction fun)
{
MessageDigest digest = FBUtilities.newMessageDigest("SHA-1");
- for (AbstractType<?> type : argTypes)
- digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8));
+ digest.update(UTF8Type.instance.decompose(fun.name().name));
+ for (AbstractType<?> type : fun.argTypes())
+ digest.update(UTF8Type.instance.decompose(type.asCQL3Type().toString()));
return ByteBuffer.wrap(digest.digest());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 6aea3b1..c8c2474 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -151,14 +151,32 @@ public class CreateTableStatement extends SchemaAlteringStatement
.addAllColumnDefinitions(getColumns(cfmd))
.isDense(isDense);
- cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
- cfmd.addColumnMetadataFromAliases(columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
+ addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
+ addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
- cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
+ addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
properties.applyToCFMetadata(cfmd);
}
+ private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
+ {
+ if (comparator instanceof CompositeType)
+ {
+ CompositeType ct = (CompositeType)comparator;
+ for (int i = 0; i < aliases.size(); ++i)
+ if (aliases.get(i) != null)
+ cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
+ }
+ else
+ {
+ assert aliases.size() <= 1;
+ if (!aliases.isEmpty() && aliases.get(0) != null)
+ cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind));
+ }
+ }
+
+
public static class RawStatement extends CFStatement
{
private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 1fcd63c..e766f65 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
@@ -59,7 +60,7 @@ import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
*/
public class AtomicBTreeColumns extends ColumnFamily
{
- static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(SystemKeyspace.BuiltIndexesTable, null))
+ static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.denseCFMetaData("keyspace", "table", BytesType.instance), null))
+ ObjectSizes.measure(new Holder(null, null));
// Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index b33e457..e71a62c 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -102,7 +102,7 @@ public class BatchlogManager implements BatchlogManagerMBean
public int countAllBatches()
{
- String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG_TABLE);
+ String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG);
return (int) executeInternal(query).one().getLong("count");
}
@@ -137,8 +137,8 @@ public class BatchlogManager implements BatchlogManagerMBean
@VisibleForTesting
static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now)
{
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.BatchlogTable);
- CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.BatchlogTable.comparator.builder().build(), now);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog);
+ CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now);
adder.add("data", serializeMutations(mutations, version))
.add("written_at", new Date(now / 1000))
.add("version", version);
@@ -174,7 +174,7 @@ public class BatchlogManager implements BatchlogManagerMBean
UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
SystemKeyspace.NAME,
- SystemKeyspace.BATCHLOG_TABLE,
+ SystemKeyspace.BATCHLOG,
PAGE_SIZE));
while (!page.isEmpty())
@@ -186,7 +186,7 @@ public class BatchlogManager implements BatchlogManagerMBean
page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
SystemKeyspace.NAME,
- SystemKeyspace.BATCHLOG_TABLE,
+ SystemKeyspace.BATCHLOG,
PAGE_SIZE),
id);
}
@@ -199,7 +199,7 @@ public class BatchlogManager implements BatchlogManagerMBean
private void deleteBatch(UUID id)
{
Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id));
- mutation.delete(SystemKeyspace.BATCHLOG_TABLE, FBUtilities.timestampMicros());
+ mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
mutation.apply();
}
@@ -447,7 +447,7 @@ public class BatchlogManager implements BatchlogManagerMBean
// force flush + compaction to reclaim space from the replayed batches
private void cleanup() throws ExecutionException, InterruptedException
{
- ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG_TABLE);
+ ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
cfs.forceBlockingFlush();
Collection<Descriptor> descriptors = new ArrayList<>();
for (SSTableReader sstr : cfs.getSSTables())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index 5cb62ed..d5ede03 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -46,7 +47,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mut
{
public void runMayThrow() throws Exception
{
- DefsTables.mergeSchema(message.payload);
+ LegacySchemaTables.mergeSchema(message.payload);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
deleted file mode 100644
index 82a5dd1..0000000
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.UTMetaData;
-import org.apache.cassandra.cql3.functions.Functions;
-import org.apache.cassandra.cql3.functions.UDAggregate;
-import org.apache.cassandra.cql3.functions.UDFunction;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
- * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations
- * and used for schema distribution.
- */
-public class DefsTables
-{
- private static final Logger logger = LoggerFactory.getLogger(DefsTables.class);
-
- /**
- * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_TABLE)
- *
- * @return Collection of found keyspace definitions
- */
- public static Collection<KSMetaData> loadFromKeyspace()
- {
- List<Row> serializedSchema = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE);
-
- List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size());
-
- for (Row row : serializedSchema)
- {
- if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row))
- continue;
-
- keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key), serializedUserTypes(row.key)));
- }
-
- return keyspaces;
- }
-
- private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
- {
- ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE);
- return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
- SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE,
- System.currentTimeMillis())));
- }
-
- private static Row serializedUserTypes(DecoratedKey ksNameKey)
- {
- ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_USER_TYPES_TABLE);
- return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
- SystemKeyspace.SCHEMA_USER_TYPES_TABLE,
- System.currentTimeMillis())));
- }
-
- /**
- * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
- * (which also involves fs operations on add/drop ks/cf)
- *
- * @param mutations the schema changes to apply
- *
- * @throws ConfigurationException If one of metadata attributes has invalid value
- * @throws IOException If data was corrupted during transportation or failed to apply fs operations
- */
- public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException
- {
- mergeSchemaInternal(mutations, true);
- Schema.instance.updateVersionAndAnnounce();
- }
-
- public static synchronized void mergeSchemaInternal(Collection<Mutation> mutations, boolean doFlush) throws IOException
- {
- // compare before/after schemas of the affected keyspaces only
- Set<String> keyspaces = new HashSet<>(mutations.size());
- for (Mutation mutation : mutations)
- keyspaces.add(ByteBufferUtil.string(mutation.key()));
-
- // current state of the schema
- Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
-
- for (Mutation mutation : mutations)
- mutation.apply();
-
- if (doFlush)
- flushSchemaCFs();
-
- // with new data applied
- Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces);
- Map<DecoratedKey, ColumnFamily> newAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces);
-
- Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
- mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
- mergeTypes(oldTypes, newTypes);
- mergeFunctions(oldFunctions, newFunctions);
- mergeAggregates(oldAggregates, newAggregates);
-
- // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
- for (String keyspaceToDrop : keyspacesToDrop)
- dropKeyspace(keyspaceToDrop);
- }
-
- private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<Row> created = new ArrayList<>();
- List<String> altered = new ArrayList<>();
- Set<String> dropped = new HashSet<>();
-
- /*
- * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
- * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
- * there that only has the top-level deletion, if:
- * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
- * b) a pulled dropped keyspace that got dropped before it could find a way to this node
- * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
- * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
- * to this node
- */
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.add(new Row(entry.getKey(), entry.getValue()));
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- altered.add(keyspaceName);
- else if (pre.hasColumns())
- dropped.add(keyspaceName);
- else if (post.hasColumns()) // a (re)created keyspace
- created.add(new Row(entry.getKey(), post));
- }
-
- for (Row row : created)
- addKeyspace(KSMetaData.fromSchema(row, Collections.<CFMetaData>emptyList(), new UTMetaData()));
- for (String name : altered)
- updateKeyspace(name);
- return dropped;
- }
-
- // see the comments for mergeKeyspaces()
- private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<CFMetaData> created = new ArrayList<>();
- List<CFMetaData> altered = new ArrayList<>();
- List<CFMetaData> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- {
- MapDifference<String, CFMetaData> delta =
- Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(),
- KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>()
- {
- public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair)
- {
- return pair.rightValue();
- }
- }));
- }
- else if (pre.hasColumns())
- {
- dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values());
- }
- else if (post.hasColumns())
- {
- created.addAll(KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), post)).values());
- }
- }
-
- for (CFMetaData cfm : created)
- addColumnFamily(cfm);
- for (CFMetaData cfm : altered)
- updateColumnFamily(cfm.ksName, cfm.cfName);
- for (CFMetaData cfm : dropped)
- dropColumnFamily(cfm.ksName, cfm.cfName);
- }
-
- // see the comments for mergeKeyspaces()
- private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<UserType> created = new ArrayList<>();
- List<UserType> altered = new ArrayList<>();
- List<UserType> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- // New keyspace with types
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(UTMetaData.fromSchema(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- {
- MapDifference<ByteBuffer, UserType> delta =
- Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(),
- UTMetaData.fromSchema(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>()
- {
- public UserType apply(MapDifference.ValueDifference<UserType> pair)
- {
- return pair.rightValue();
- }
- }));
- }
- else if (pre.hasColumns())
- {
- dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values());
- }
- else if (post.hasColumns())
- {
- created.addAll(UTMetaData.fromSchema(new Row(entry.getKey(), post)).values());
- }
- }
-
- for (UserType type : created)
- addType(type);
- for (UserType type : altered)
- updateType(type);
- for (UserType type : dropped)
- dropType(type);
- }
-
- // see the comments for mergeKeyspaces()
- private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<UDFunction> created = new ArrayList<>();
- List<UDFunction> altered = new ArrayList<>();
- List<UDFunction> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- // New keyspace with functions
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(UDFunction.fromSchema(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- {
- MapDifference<Composite, UDFunction> delta =
- Maps.difference(UDFunction.fromSchema(new Row(entry.getKey(), pre)),
- UDFunction.fromSchema(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>()
- {
- public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair)
- {
- return pair.rightValue();
- }
- }));
- }
- else if (pre.hasColumns())
- {
- dropped.addAll(UDFunction.fromSchema(new Row(entry.getKey(), pre)).values());
- }
- else if (post.hasColumns())
- {
- created.addAll(UDFunction.fromSchema(new Row(entry.getKey(), post)).values());
- }
- }
-
- for (UDFunction udf : created)
- addFunction(udf);
- for (UDFunction udf : altered)
- updateFunction(udf);
- for (UDFunction udf : dropped)
- dropFunction(udf);
- }
-
- // see the comments for mergeKeyspaces()
- private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
- {
- List<UDAggregate> created = new ArrayList<>();
- List<UDAggregate> altered = new ArrayList<>();
- List<UDAggregate> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- // New keyspace with functions
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- {
- MapDifference<Composite, UDAggregate> delta =
- Maps.difference(UDAggregate.fromSchema(new Row(entry.getKey(), pre)),
- UDAggregate.fromSchema(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
- {
- public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
- {
- return pair.rightValue();
- }
- }));
- }
- else if (pre.hasColumns())
- {
- dropped.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), pre)).values());
- }
- else if (post.hasColumns())
- {
- created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), post)).values());
- }
- }
-
- for (UDAggregate udf : created)
- addAggregate(udf);
- for (UDAggregate udf : altered)
- updateAggregate(udf);
- for (UDAggregate udf : dropped)
- dropAggregate(udf);
- }
-
- private static void addKeyspace(KSMetaData ksm)
- {
- assert Schema.instance.getKSMetaData(ksm.name) == null;
- Schema.instance.load(ksm);
-
- Keyspace.open(ksm.name);
- MigrationManager.instance.notifyCreateKeyspace(ksm);
- }
-
- private static void addColumnFamily(CFMetaData cfm)
- {
- assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null;
- KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
- ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
-
- logger.info("Loading {}", cfm);
-
- Schema.instance.load(cfm);
-
- // make sure it's init-ed w/ the old definitions first,
- // since we're going to call initCf on the new one manually
- Keyspace.open(cfm.ksName);
-
- Schema.instance.setKeyspaceDefinition(ksm);
- Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
- MigrationManager.instance.notifyCreateColumnFamily(cfm);
- }
-
- private static void addType(UserType ut)
- {
- KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
- assert ksm != null;
-
- logger.info("Loading {}", ut);
-
- ksm.userTypes.addType(ut);
-
- MigrationManager.instance.notifyCreateUserType(ut);
- }
-
- private static void addFunction(UDFunction udf)
- {
- logger.info("Loading {}", udf);
-
- Functions.addFunction(udf);
-
- MigrationManager.instance.notifyCreateFunction(udf);
- }
-
- private static void addAggregate(UDAggregate udf)
- {
- logger.info("Loading {}", udf);
-
- Functions.addFunction(udf);
-
- MigrationManager.instance.notifyCreateAggregate(udf);
- }
-
- private static void updateKeyspace(String ksName)
- {
- KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
- assert oldKsm != null;
- KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values());
-
- Schema.instance.setKeyspaceDefinition(newKsm);
-
- Keyspace.open(ksName).createReplicationStrategy(newKsm);
- MigrationManager.instance.notifyUpdateKeyspace(newKsm);
- }
-
- private static void updateColumnFamily(String ksName, String cfName)
- {
- CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
- assert cfm != null;
- cfm.reload();
-
- Keyspace keyspace = Keyspace.open(cfm.ksName);
- keyspace.getColumnFamilyStore(cfm.cfName).reload();
- MigrationManager.instance.notifyUpdateColumnFamily(cfm);
- }
-
- private static void updateType(UserType ut)
- {
- KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
- assert ksm != null;
-
- logger.info("Updating {}", ut);
-
- ksm.userTypes.addType(ut);
-
- MigrationManager.instance.notifyUpdateUserType(ut);
- }
-
- private static void updateFunction(UDFunction udf)
- {
- logger.info("Updating {}", udf);
-
- Functions.replaceFunction(udf);
-
- MigrationManager.instance.notifyUpdateFunction(udf);
- }
-
- private static void updateAggregate(UDAggregate udf)
- {
- logger.info("Updating {}", udf);
-
- Functions.replaceFunction(udf);
-
- MigrationManager.instance.notifyUpdateAggregate(udf);
- }
-
- private static void dropKeyspace(String ksName)
- {
- KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
- String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
-
- CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
-
- Keyspace keyspace = Keyspace.open(ksm.name);
-
- // remove all cfs from the keyspace instance.
- List<UUID> droppedCfs = new ArrayList<>();
- for (CFMetaData cfm : ksm.cfMetaData().values())
- {
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
-
- Schema.instance.purge(cfm);
-
- if (DatabaseDescriptor.isAutoSnapshot())
- cfs.snapshot(snapshotName);
- Keyspace.open(ksm.name).dropCf(cfm.cfId);
-
- droppedCfs.add(cfm.cfId);
- }
-
- // remove the keyspace from the static instances.
- Keyspace.clear(ksm.name);
- Schema.instance.clearKeyspaceDefinition(ksm);
-
- keyspace.writeOrder.awaitNewBarrier();
-
- // force a new segment in the CL
- CommitLog.instance.forceRecycleAllSegments(droppedCfs);
-
- MigrationManager.instance.notifyDropKeyspace(ksm);
- }
-
- private static void dropColumnFamily(String ksName, String cfName)
- {
- KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
- assert ksm != null;
- ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
- assert cfs != null;
-
- // reinitialize the keyspace.
- CFMetaData cfm = ksm.cfMetaData().get(cfName);
-
- Schema.instance.purge(cfm);
- Schema.instance.setKeyspaceDefinition(makeNewKeyspaceDefinition(ksm, cfm));
-
- CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
-
- if (DatabaseDescriptor.isAutoSnapshot())
- cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
- Keyspace.open(ksm.name).dropCf(cfm.cfId);
- MigrationManager.instance.notifyDropColumnFamily(cfm);
-
- CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
- }
-
- private static void dropType(UserType ut)
- {
- KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
- assert ksm != null;
-
- ksm.userTypes.removeType(ut);
-
- MigrationManager.instance.notifyDropUserType(ut);
- }
-
- private static void dropFunction(UDFunction udf)
- {
- logger.info("Drop {}", udf);
-
- // TODO: this is kind of broken as this remove all overloads of the function name
- Functions.removeFunction(udf.name(), udf.argTypes());
-
- MigrationManager.instance.notifyDropFunction(udf);
- }
-
- private static void dropAggregate(UDAggregate udf)
- {
- logger.info("Drop {}", udf);
-
- // TODO: this is kind of broken as this remove all overloads of the function name
- Functions.removeFunction(udf.name(), udf.argTypes());
-
- MigrationManager.instance.notifyDropAggregate(udf);
- }
-
- private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
- {
- // clone ksm but do not include the new def
- List<CFMetaData> newCfs = new ArrayList<>(ksm.cfMetaData().values());
- newCfs.remove(toExclude);
- assert newCfs.size() == ksm.cfMetaData().size() - 1;
- return KSMetaData.cloneWith(ksm, newCfs);
- }
-
- private static void flushSchemaCFs()
- {
- for (String cf : SystemKeyspace.ALL_SCHEMA_TABLES)
- SystemKeyspace.forceBlockingFlush(cf);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 081e01b..8c4477b 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -115,7 +115,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
"internal");
- private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS_TABLE);
+ private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
/**
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
@@ -134,9 +134,9 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
- CellName name = SystemKeyspace.HintsTable.comparator.makeCellName(hintId, MessagingService.current_version);
+ CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version);
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS_TABLE));
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS));
cf.addColumn(name, value, now, ttl);
return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf);
}
@@ -182,7 +182,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
{
Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes);
- mutation.delete(SystemKeyspace.HINTS_TABLE, columnName, timestamp);
+ mutation.delete(SystemKeyspace.HINTS, columnName, timestamp);
mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
@@ -207,7 +207,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes);
- mutation.delete(SystemKeyspace.HINTS_TABLE, System.currentTimeMillis());
+ mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
@@ -241,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
try
{
logger.info("Truncating all stored hints.");
- Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS_TABLE).truncateBlocking();
+ Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS).truncateBlocking();
}
catch (Exception e)
{
@@ -375,7 +375,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
{
long now = System.currentTimeMillis();
QueryFilter filter = QueryFilter.getSliceFilter(epkey,
- SystemKeyspace.HINTS_TABLE,
+ SystemKeyspace.HINTS,
startColumn,
Composites.EMPTY,
false,
@@ -601,7 +601,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
try
{
RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME,
- SystemKeyspace.HINTS_TABLE,
+ SystemKeyspace.HINTS,
System.currentTimeMillis(),
predicate,
range,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 09fc338..b34d589 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -32,7 +32,6 @@ import java.util.concurrent.Future;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.QueryPagers;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 4cf441e..2381f26 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -357,7 +357,7 @@ public class Memtable
// and BL data is strictly local, so we don't need to preserve tombstones for repair.
// If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
// See CASSANDRA-4667.
- if (cfs.name.equals(SystemKeyspace.BATCHLOG_TABLE) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
+ if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index d4503ba..79753c1 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.service.MigrationManager;
/**
@@ -40,7 +41,7 @@ public class MigrationRequestVerbHandler implements IVerbHandler
{
logger.debug("Received migration request from {}.", message.from);
MessageOut<Collection<Mutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
- SystemKeyspace.serializeSchema(),
+ LegacySchemaTables.convertSchemaToMutations(),
MigrationManager.MigrationsSerializer.instance);
MessagingService.instance().sendReply(response, id, message.from);
}