You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/25 02:16:53 UTC
[1/4] git commit: Allow concurrent schema migrations patch by Pavel
Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-1391
Updated Branches:
refs/heads/trunk e594e0dca -> 37b079352
Allow concurrent schema migrations
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-1391
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37b07935
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37b07935
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37b07935
Branch: refs/heads/trunk
Commit: 37b079352d412bb67036aa4130107728b9c8ae0d
Parents: e594e0d
Author: Pavel Yaskevich <po...@gmail.com>
Authored: Sun Jan 22 02:46:02 2012 +0200
Committer: Pavel Yaskevich <po...@gmail.com>
Committed: Wed Jan 25 04:12:36 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 343 ++++++++++---
.../apache/cassandra/config/ColumnDefinition.java | 180 ++++++-
.../cassandra/config/DatabaseDescriptor.java | 96 ++--
.../org/apache/cassandra/config/KSMetaData.java | 234 ++++++++-
src/java/org/apache/cassandra/config/Schema.java | 137 ++++--
.../apache/cassandra/cql/AlterTableStatement.java | 45 +-
.../apache/cassandra/cql/DropIndexStatement.java | 17 +-
.../org/apache/cassandra/cql/QueryProcessor.java | 47 +--
src/java/org/apache/cassandra/db/ColumnFamily.java | 19 +-
.../cassandra/db/DefinitionsUpdateVerbHandler.java | 67 +--
src/java/org/apache/cassandra/db/DefsTable.java | 391 +++++++++++++--
.../cassandra/db/MigrationRequestVerbHandler.java | 53 ++
src/java/org/apache/cassandra/db/SystemTable.java | 118 +++++-
.../cassandra/db/migration/AddColumnFamily.java | 95 +---
.../apache/cassandra/db/migration/AddKeyspace.java | 69 +--
.../cassandra/db/migration/DropColumnFamily.java | 89 +---
.../cassandra/db/migration/DropKeyspace.java | 61 +--
.../apache/cassandra/db/migration/Migration.java | 269 ++---------
.../cassandra/db/migration/MigrationHelper.java | 371 ++++++++++++++
.../cassandra/db/migration/UpdateColumnFamily.java | 89 +---
.../cassandra/db/migration/UpdateKeyspace.java | 81 +---
src/java/org/apache/cassandra/io/SerDeUtils.java | 124 -----
.../cassandra/service/AbstractCassandraDaemon.java | 14 -
.../apache/cassandra/service/MigrationManager.java | 304 ++++++------
.../apache/cassandra/service/StorageService.java | 14 +-
.../apache/cassandra/thrift/CassandraServer.java | 72 +---
.../org/apache/cassandra/utils/FBUtilities.java | 27 +-
.../serialization/0.7/db.migration.Keyspace1.bin | 1 -
.../serialization/0.7/db.migration.Keyspace2.bin | 1 -
.../serialization/0.7/db.migration.Keyspace3.bin | Bin 10498 -> 0 bytes
.../serialization/0.7/db.migration.Keyspace4.bin | 1 -
.../serialization/0.7/db.migration.Keyspace5.bin | 1 -
test/unit/org/apache/cassandra/SchemaLoader.java | 2 +-
.../apache/cassandra/config/CFMetaDataTest.java | 35 +-
.../cassandra/config/ColumnDefinitionTest.java | 2 +-
.../cassandra/config/DatabaseDescriptorTest.java | 26 +-
test/unit/org/apache/cassandra/db/DefsTest.java | 112 +----
.../cassandra/db/migration/SerializationsTest.java | 77 ---
39 files changed, 2085 insertions(+), 1600 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3e5af50..c7a228f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,7 @@
* Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749)
* Fix BulkLoader to support new SSTable layout and add stream
throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752)
+ * Allow concurrent schema migrations (CASSANDRA-1391)
1.0.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4c05e75..1afa04f 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -18,30 +18,38 @@
package org.apache.cassandra.config;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Objects;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.avro.util.Utf8;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
+
public final class CFMetaData
{
//
@@ -63,11 +71,51 @@ public final class CFMetaData
public static final CFMetaData StatusCf = newSystemMetadata(SystemTable.STATUS_CF, 0, "persistent metadata for the local node", BytesType.instance, null);
public static final CFMetaData HintsCf = newSystemMetadata(HintedHandOffManager.HINTS_CF, 1, "hinted handoff data", BytesType.instance, BytesType.instance);
+ @Deprecated
public static final CFMetaData MigrationsCf = newSystemMetadata(Migration.MIGRATIONS_CF, 2, "individual schema mutations", TimeUUIDType.instance, null);
+ @Deprecated
public static final CFMetaData SchemaCf = newSystemMetadata(Migration.SCHEMA_CF, 3, "current state of the schema", UTF8Type.instance, null);
public static final CFMetaData IndexCf = newSystemMetadata(SystemTable.INDEX_CF, 5, "indexes that have been completed", UTF8Type.instance, null);
public static final CFMetaData NodeIdCf = newSystemMetadata(SystemTable.NODE_ID_CF, 6, "nodeId and their metadata", TimeUUIDType.instance, null);
public static final CFMetaData VersionCf = newSystemMetadata(SystemTable.VERSION_CF, 7, "server version information", UTF8Type.instance, null);
+ public static final CFMetaData SchemaKeyspacesCf = schemaCFDefinition(SystemTable.SCHEMA_KEYSPACES_CF, 8, "keyspace attributes of the schema", AsciiType.instance, 1);
+ public static final CFMetaData SchemaColumnFamiliesCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNFAMILIES_CF, 9, "ColumnFamily attributes of the schema", AsciiType.instance, 2);
+ public static final CFMetaData SchemaColumnsCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNS_CF, 10, "ColumnFamily column attributes of the schema", AsciiType.instance, 3);
+
+ private static CFMetaData schemaCFDefinition(String name, int index, String comment, AbstractType<?> comp, int nestingLevel)
+ {
+ try
+ {
+ AbstractType<?> comparator;
+
+ if (nestingLevel == 1)
+ {
+ comparator = comp;
+ }
+ else
+ {
+ List<AbstractType<?>> composite = new ArrayList<AbstractType<?>>(nestingLevel);
+
+ for (int i = 0; i < nestingLevel; i++)
+ composite.add(comp);
+
+ comparator = CompositeType.getInstance(composite);
+ }
+
+ return newSystemMetadata(name,
+ index,
+ comment,
+ comparator,
+ null)
+ .keyValidator(AsciiType.instance)
+ .defaultValidator(UTF8Type.instance);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
static
{
try
@@ -277,47 +325,7 @@ public final class CFMetaData
return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
}
- // converts CFM to avro CfDef
- public org.apache.cassandra.db.migration.avro.CfDef toAvro()
- {
- org.apache.cassandra.db.migration.avro.CfDef cf = new org.apache.cassandra.db.migration.avro.CfDef();
- cf.id = cfId;
- cf.keyspace = new Utf8(ksName);
- cf.name = new Utf8(cfName);
- cf.column_type = new Utf8(cfType.name());
- cf.comparator_type = new Utf8(comparator.toString());
- if (subcolumnComparator != null)
- {
- assert cfType == ColumnFamilyType.Super
- : String.format("%s CF %s should not have subcomparator %s defined", cfType, cfName, subcolumnComparator);
- cf.subcomparator_type = new Utf8(subcolumnComparator.toString());
- }
- cf.comment = new Utf8(enforceCommentNotNull(comment));
- cf.read_repair_chance = readRepairChance;
- cf.replicate_on_write = replicateOnWrite;
- cf.gc_grace_seconds = gcGraceSeconds;
- cf.default_validation_class = defaultValidator == null ? null : new Utf8(defaultValidator.toString());
- cf.key_validation_class = new Utf8(keyValidator.toString());
- cf.min_compaction_threshold = minCompactionThreshold;
- cf.max_compaction_threshold = maxCompactionThreshold;
- cf.merge_shards_chance = mergeShardsChance;
- cf.key_alias = keyAlias;
- cf.column_metadata = new ArrayList<ColumnDef>(column_metadata.size());
- for (ColumnDefinition cd : column_metadata.values())
- cf.column_metadata.add(cd.toAvro());
- cf.compaction_strategy = new Utf8(compactionStrategyClass.getName());
- if (compactionStrategyOptions != null)
- {
- cf.compaction_strategy_options = new HashMap<CharSequence, CharSequence>();
- for (Map.Entry<String, String> e : compactionStrategyOptions.entrySet())
- cf.compaction_strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
- }
- cf.compression_options = compressionParameters.asAvroOptions();
- cf.bloom_filter_fp_chance = bloomFilterFpChance;
- cf.caching = new Utf8(caching.toString());
- return cf;
- }
-
+ @Deprecated
public static CFMetaData fromAvro(org.apache.cassandra.db.migration.avro.CfDef cf)
{
AbstractType<?> comparator;
@@ -338,7 +346,7 @@ public final class CFMetaData
throw new RuntimeException("Could not inflate CFMetaData for " + cf, ex);
}
Map<ByteBuffer, ColumnDefinition> column_metadata = new TreeMap<ByteBuffer, ColumnDefinition>(BytesType.instance);
- for (ColumnDef aColumn_metadata : cf.column_metadata)
+ for (org.apache.cassandra.db.migration.avro.ColumnDef aColumn_metadata : cf.column_metadata)
{
ColumnDefinition cd = ColumnDefinition.fromAvro(aColumn_metadata);
if (cd.getIndexType() != null && cd.getIndexName() == null)
@@ -627,22 +635,45 @@ public final class CFMetaData
.validate();
}
- /** updates CFMetaData in-place to match cf_def */
- public void apply(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
+ public void reload() throws IOException
+ {
+ Row cfDefRow = SystemTable.readSchemaRow(ksName, cfName);
+
+ if (cfDefRow.cf == null || cfDefRow.cf.isEmpty())
+ throw new IOException(String.format("%s not found in the schema definitions table.", ksName + ":" + cfName));
+
+ try
+ {
+ apply(fromSchema(cfDefRow.cf));
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Updates CFMetaData in-place to match cf_def
+ *
+ * *Note*: This method left public only for DefsTest, don't use directly!
+ *
+ * @throws ConfigurationException if ks/cf names or cf ids didn't match
+ */
+ public void apply(CfDef cf_def) throws ConfigurationException
{
logger.debug("applying {} to {}", cf_def, this);
// validate
- if (!cf_def.keyspace.toString().equals(ksName))
+ if (!cf_def.keyspace.equals(ksName))
throw new ConfigurationException(String.format("Keyspace mismatch (found %s; expected %s)",
cf_def.keyspace, ksName));
- if (!cf_def.name.toString().equals(cfName))
+ if (!cf_def.name.equals(cfName))
throw new ConfigurationException(String.format("Column family mismatch (found %s; expected %s)",
cf_def.name, cfName));
- if (!cf_def.id.equals(cfId))
+ if (cf_def.id != cfId)
throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)",
cf_def.id, cfId));
- if (!cf_def.column_type.toString().equals(cfType.name()))
+ if (!cf_def.column_type.equals(cfType.name()))
throw new ConfigurationException("types do not match.");
if (comparator != TypeParser.parse(cf_def.comparator_type))
throw new ConfigurationException("comparators do not match.");
@@ -667,15 +698,18 @@ public final class CFMetaData
maxCompactionThreshold = cf_def.max_compaction_threshold;
mergeShardsChance = cf_def.merge_shards_chance;
keyAlias = cf_def.key_alias;
- if (cf_def.bloom_filter_fp_chance != null)
+ if (cf_def.isSetBloom_filter_fp_chance())
bloomFilterFpChance = cf_def.bloom_filter_fp_chance;
- caching = Caching.fromString(cf_def.caching.toString());
+ caching = Caching.fromString(cf_def.caching);
+
+ if (!cf_def.isSetColumn_metadata())
+ cf_def.setColumn_metadata(new ArrayList<ColumnDef>());
// adjust column definitions. figure out who is coming and going.
Set<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
Set<ByteBuffer> newColumns = new HashSet<ByteBuffer>();
- Set<org.apache.cassandra.db.migration.avro.ColumnDef> toAdd = new HashSet<org.apache.cassandra.db.migration.avro.ColumnDef>();
- for (org.apache.cassandra.db.migration.avro.ColumnDef def : cf_def.column_metadata)
+ Set<ColumnDef> toAdd = new HashSet<ColumnDef>();
+ for (ColumnDef def : cf_def.column_metadata)
{
newColumns.add(def.name);
if (!column_metadata.containsKey(def.name))
@@ -691,36 +725,36 @@ public final class CFMetaData
column_metadata.remove(indexName);
}
// update the ones staying
- for (org.apache.cassandra.db.migration.avro.ColumnDef def : cf_def.column_metadata)
+ for (ColumnDef def : cf_def.column_metadata)
{
ColumnDefinition oldDef = column_metadata.get(def.name);
if (oldDef == null)
continue;
oldDef.setValidator(TypeParser.parse(def.validation_class));
- oldDef.setIndexType(def.index_type == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(def.index_type.name()),
- ColumnDefinition.getStringMap(def.index_options));
- oldDef.setIndexName(def.index_name == null ? null : def.index_name.toString());
+ oldDef.setIndexType(def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
+ def.index_options);
+ oldDef.setIndexName(def.index_name == null ? null : def.index_name);
}
// add the new ones coming in.
- for (org.apache.cassandra.db.migration.avro.ColumnDef def : toAdd)
+ for (ColumnDef def : toAdd)
{
AbstractType<?> dValidClass = TypeParser.parse(def.validation_class);
ColumnDefinition cd = new ColumnDefinition(def.name,
dValidClass,
- def.index_type == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(def.index_type.toString()),
- ColumnDefinition.getStringMap(def.index_options),
- def.index_name == null ? null : def.index_name.toString());
+ def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
+ def.index_options,
+ def.index_name == null ? null : def.index_name);
column_metadata.put(cd.name, cd);
}
if (cf_def.compaction_strategy != null)
- compactionStrategyClass = createCompactionStrategy(cf_def.compaction_strategy.toString());
+ compactionStrategyClass = createCompactionStrategy(cf_def.compaction_strategy);
if (null != cf_def.compaction_strategy_options)
{
compactionStrategyOptions = new HashMap<String, String>();
- for (Map.Entry<CharSequence, CharSequence> e : cf_def.compaction_strategy_options.entrySet())
- compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
+ for (Map.Entry<String, String> e : cf_def.compaction_strategy_options.entrySet())
+ compactionStrategyOptions.put(e.getKey(), e.getValue());
}
compressionParameters = CompressionParameters.create(cf_def.compression_options);
@@ -749,9 +783,7 @@ public final class CFMetaData
ColumnFamilyStore.class,
Map.class // options
});
- return (AbstractCompactionStrategy)constructor.newInstance(new Object[] {
- cfs,
- compactionStrategyOptions});
+ return (AbstractCompactionStrategy)constructor.newInstance(cfs, compactionStrategyOptions);
}
catch (NoSuchMethodException e)
{
@@ -788,7 +820,7 @@ public final class CFMetaData
def.setRead_repair_chance(readRepairChance);
def.setReplicate_on_write(replicateOnWrite);
def.setGc_grace_seconds(gcGraceSeconds);
- def.setDefault_validation_class(defaultValidator.toString());
+ def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
def.setKey_validation_class(keyValidator.toString());
def.setMin_compaction_threshold(minCompactionThreshold);
def.setMax_compaction_threshold(maxCompactionThreshold);
@@ -815,9 +847,9 @@ public final class CFMetaData
return def;
}
- public static void validateMinMaxCompactionThresholds(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
+ public static void validateMinMaxCompactionThresholds(CfDef cf_def) throws ConfigurationException
{
- if (cf_def.min_compaction_threshold != null && cf_def.max_compaction_threshold != null)
+ if (cf_def.isSetMin_compaction_threshold() && cf_def.isSetMax_compaction_threshold())
{
if ((cf_def.min_compaction_threshold > cf_def.max_compaction_threshold) &&
cf_def.max_compaction_threshold != 0)
@@ -825,15 +857,15 @@ public final class CFMetaData
throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold");
}
}
- else if (cf_def.min_compaction_threshold != null)
+ else if (cf_def.isSetMin_compaction_threshold())
{
if (cf_def.min_compaction_threshold > DEFAULT_MAX_COMPACTION_THRESHOLD)
{
- throw new ConfigurationException("min_compaction_threshold cannot be greather than max_compaction_threshold (default " +
+ throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold (default " +
DEFAULT_MAX_COMPACTION_THRESHOLD + ")");
}
}
- else if (cf_def.max_compaction_threshold != null)
+ else if (cf_def.isSetMax_compaction_threshold())
{
if (cf_def.max_compaction_threshold < DEFAULT_MIN_COMPACTION_THRESHOLD && cf_def.max_compaction_threshold != 0) {
throw new ConfigurationException("max_compaction_threshold cannot be less than min_compaction_threshold");
@@ -923,6 +955,167 @@ public final class CFMetaData
return this;
}
+ /**
+ * Calculate the difference between current metadata and given and serialize it as schema RowMutation
+ *
+ * @param newState The new metadata (for the same CF)
+ * @param modificationTimestamp Timestamp to use for mutation
+ *
+ * @return Difference between attributes in form of schema mutation
+ *
+ * @throws ConfigurationException if any of the attributes didn't pass validation
+ */
+ public RowMutation diff(CfDef newState, long modificationTimestamp) throws ConfigurationException
+ {
+ CfDef curState = toThrift();
+ RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+ for (CfDef._Fields field : CfDef._Fields.values())
+ {
+ if (field.equals(CfDef._Fields.COLUMN_METADATA))
+ continue; // deal with columns after main attributes
+
+ Object curValue = curState.getFieldValue(field);
+ Object newValue = newState.getFieldValue(field);
+
+ if (Objects.equal(curValue, newValue))
+ continue;
+
+ m.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(curState.name, field.getFieldName())),
+ valueAsBytes(newValue),
+ modificationTimestamp);
+ }
+
+ AbstractType nameComparator = cfType.equals(ColumnFamilyType.Super)
+ ? subcolumnComparator
+ : comparator;
+
+ MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, ColumnDefinition.fromThrift(newState.column_metadata));
+ Map<ByteBuffer, ColumnDef> columnDefMap = ColumnDefinition.toMap(newState.column_metadata);
+
+ // columns that are no longer needed
+ for (ByteBuffer name : columnDiff.entriesOnlyOnLeft().keySet())
+ ColumnDefinition.deleteFromSchema(m, curState.name, nameComparator, name, modificationTimestamp);
+
+ // newly added columns
+ for (ByteBuffer name : columnDiff.entriesOnlyOnRight().keySet())
+ ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+
+ // old columns with updated attributes
+ for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
+ ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+
+ return m;
+ }
+
+ /**
+ * Remove all CF attributes from schema
+ *
+ * @param timestamp Timestamp to use
+ *
+ * @return RowMutation to use to completely remove cf from schema
+ */
+ public RowMutation dropFromSchema(long timestamp)
+ {
+ RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+ for (CfDef._Fields field : CfDef._Fields.values())
+ m.delete(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfName, field.getFieldName())), timestamp);
+
+ for (ColumnDefinition columnDefinition : column_metadata.values())
+ ColumnDefinition.deleteFromSchema(m, cfName, comparator, columnDefinition.name, timestamp);
+
+ return m;
+ }
+
+ /**
+ * Convert current metadata into schema mutation
+ *
+ * @param timestamp Timestamp to use
+ *
+ * @return Low-level representation of the CF
+ *
+ * @throws ConfigurationException if any of the attributes didn't pass validation
+ */
+ public RowMutation toSchema(long timestamp) throws ConfigurationException
+ {
+ RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+ toSchema(mutation, toThrift(), timestamp);
+
+ return mutation;
+ }
+
+ /**
+ * Convert given Thrift-serialized metadata into schema mutation
+ *
+ * @param mutation The mutation to include ColumnFamily attributes into (can contain keyspace attributes already)
+ * @param cfDef Thrift-serialized metadata to use as source for schema mutation
+ * @param timestamp Timestamp to use
+ *
+ * @throws ConfigurationException if any of the attributes didn't pass validation
+ */
+ public static void toSchema(RowMutation mutation, CfDef cfDef, long timestamp) throws ConfigurationException
+ {
+ applyImplicitDefaults(cfDef);
+
+ for (CfDef._Fields field : CfDef._Fields.values())
+ {
+ if (field.equals(CfDef._Fields.COLUMN_METADATA))
+ continue;
+
+ mutation.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfDef.name, field.getFieldName())),
+ valueAsBytes(cfDef.getFieldValue(field)),
+ timestamp);
+ }
+
+ if (!cfDef.isSetColumn_metadata())
+ return;
+
+ AbstractType comparator = TypeParser.parse(cfDef.column_type.equals("Super")
+ ? cfDef.subcomparator_type
+ : cfDef.comparator_type);
+
+ for (ColumnDef columnDef : cfDef.column_metadata)
+ ColumnDefinition.addToSchema(mutation, cfDef.name, comparator, columnDef, timestamp);
+ }
+
+ /**
+ * Deserialize CF metadata from low-level representation
+ *
+ * @param serializedCfDef The data to use for deserialization
+ *
+ * @return Thrift-based metadata deserialized from schema
+ *
+ * @throws IOException on any I/O related error
+ */
+ public static CfDef fromSchema(ColumnFamily serializedCfDef) throws IOException
+ {
+ assert serializedCfDef != null;
+
+ CfDef cfDef = new CfDef();
+
+ AbstractType sysComparator = serializedCfDef.getComparator();
+
+ for (IColumn cfAttr : serializedCfDef.getSortedColumns())
+ {
+ if (cfAttr == null || cfAttr.isMarkedForDelete())
+ continue;
+
+ // column name format is <cf>:<attribute name>
+ String[] attr = sysComparator.getString(cfAttr.name()).split(":");
+ assert attr.length == 2;
+
+ CfDef._Fields field = CfDef._Fields.findByName(attr[1]);
+ cfDef.setFieldValue(field, deserializeValue(cfAttr.value(), getValueClass(CfDef.class, field.getFieldName())));
+ }
+
+ for (ColumnDef columnDef : ColumnDefinition.fromSchema(cfDef.keyspace, cfDef.name))
+ cfDef.addToColumn_metadata(columnDef);
+
+ return cfDef;
+ }
+
@Override
public String toString()
{
@@ -951,4 +1144,4 @@ public final class CFMetaData
.append("caching", caching)
.toString();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 115bc2a..d8c9db9 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -24,16 +24,22 @@ package org.apache.cassandra.config;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.avro.util.Utf8;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.migration.MigrationHelper;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
+
public class ColumnDefinition
{
-
public final ByteBuffer name;
private AbstractType<?> validator;
private IndexType index_type;
@@ -80,19 +86,7 @@ public class ColumnDefinition
return result;
}
- public org.apache.cassandra.db.migration.avro.ColumnDef toAvro()
- {
- org.apache.cassandra.db.migration.avro.ColumnDef cd = new org.apache.cassandra.db.migration.avro.ColumnDef();
- cd.name = ByteBufferUtil.clone(name);
- cd.validation_class = new Utf8(validator.toString());
- cd.index_type = index_type == null
- ? null
- : org.apache.cassandra.db.migration.avro.IndexType.valueOf(index_type.name());
- cd.index_name = index_name == null ? null : new Utf8(index_name);
- cd.index_options = getCharSequenceMap(index_options);
- return cd;
- }
-
+ @Deprecated
public static ColumnDefinition fromAvro(org.apache.cassandra.db.migration.avro.ColumnDef cd)
{
IndexType index_type = cd.index_type == null ? null : Enum.valueOf(IndexType.class, cd.index_type.name());
@@ -108,6 +102,22 @@ public class ColumnDefinition
}
}
+ public ColumnDef toThrift()
+ {
+ ColumnDef cd = new ColumnDef();
+
+ cd.setName(ByteBufferUtil.clone(name));
+ cd.setValidation_class(validator.toString());
+
+ cd.setIndex_type(index_type == null
+ ? null
+ : IndexType.valueOf(index_type.name()));
+ cd.setIndex_name(index_name == null ? null : index_name);
+ cd.setIndex_options(index_options == null ? null : Maps.newHashMap(index_options));
+
+ return cd;
+ }
+
public static ColumnDefinition fromThrift(ColumnDef thriftColumnDef) throws ConfigurationException
{
return new ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name),
@@ -129,6 +139,133 @@ public class ColumnDefinition
return cds;
}
+ public static Map<ByteBuffer, ColumnDef> toMap(List<ColumnDef> columnDefs)
+ {
+ Map<ByteBuffer, ColumnDef> map = new HashMap<ByteBuffer, ColumnDef>();
+
+ if (columnDefs == null)
+ return map;
+
+ for (ColumnDef columnDef : columnDefs)
+ map.put(columnDef.name, columnDef);
+
+ return map;
+ }
+
+ /**
+ * Drop specified column from the schema using given row mutation.
+ *
+ * @param mutation The schema row mutation
+ * @param cfName The name of the parent ColumnFamily
+ * @param comparator The comparator to serialize column name in human-readable format
+ * @param columnName The column name as String
+ * @param timestamp The timestamp to use for column modification
+ */
+ public static void deleteFromSchema(RowMutation mutation, String cfName, AbstractType comparator, ByteBuffer columnName, long timestamp)
+ {
+ toSchema(mutation, comparator, cfName, columnName, null, timestamp, true);
+ }
+
+ /**
+ * Add new/update column to/in the schema.
+ *
+ * @param mutation The schema row mutation
+ * @param cfName The name of the parent ColumnFamily
+ * @param comparator The comparator to serialize column name in human-readable format
+ * @param columnDef The Thrift-based column definition that contains all attributes
+ * @param timestamp The timestamp to use for column modification
+ */
+ public static void addToSchema(RowMutation mutation, String cfName, AbstractType comparator, ColumnDef columnDef, long timestamp)
+ {
+ toSchema(mutation, comparator, cfName, columnDef.name, columnDef, timestamp, false);
+ }
+
+ /**
+ * Serialize given ColumnDef into given schema row mutation to add or drop it.
+ *
+ * @param mutation The mutation to use for serialization
+ * @param comparator The comparator to serialize column name in human-readable format
+ * @param cfName The name of the parent ColumnFamily
+ * @param columnName The column name as String
+ * @param columnDef The Thrift-based column definition that contains all attributes
+ * @param timestamp The timestamp to use for column modification
+ * @param delete The flag which indicates if column should be deleted or added to the schema
+ */
+ private static void toSchema(RowMutation mutation, AbstractType comparator, String cfName, ByteBuffer columnName, ColumnDef columnDef, long timestamp, boolean delete)
+ {
+ for (ColumnDef._Fields field : ColumnDef._Fields.values())
+ {
+ QueryPath path = new QueryPath(SystemTable.SCHEMA_COLUMNS_CF,
+ null,
+ compositeNameFor(cfName,
+ readableColumnName(columnName, comparator),
+ field.getFieldName()));
+
+ if (delete)
+ mutation.delete(path, timestamp);
+ else
+ mutation.add(path, valueAsBytes(columnDef.getFieldValue(field)), timestamp);
+ }
+ }
+
+ /**
+ * Deserialize columns from low-level representation
+ *
+ * @param ksName The corresponding Keyspace
+ * @param cfName The name of the parent ColumnFamily
+ *
+ * @return Thrift-based deserialized representation of the column
+ */
+ public static List<ColumnDef> fromSchema(String ksName, String cfName)
+ {
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(SystemTable.getSchemaKSKey(ksName));
+ ColumnFamilyStore columnsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF);
+ ColumnFamily columns = columnsStore.getColumnFamily(key,
+ new QueryPath(SystemTable.SCHEMA_COLUMNS_CF),
+ MigrationHelper.searchComposite(cfName, true),
+ MigrationHelper.searchComposite(cfName, false),
+ false,
+ Integer.MAX_VALUE);
+
+ if (columns == null || columns.isEmpty())
+ return Collections.emptyList();
+
+ // contenders to be a valid columns, re-check is done after all attributes
+ // were read from serialized state, if ColumnDef has all required fields it gets promoted to be returned
+ Map<String, ColumnDef> contenders = new HashMap<String, ColumnDef>();
+
+ for (IColumn column : columns.getSortedColumns())
+ {
+ if (column.isMarkedForDelete())
+ continue;
+
+ // column name format <cf>:<column name>:<attribute name>
+ String[] components = columns.getComparator().getString(column.name()).split(":");
+ assert components.length == 3;
+
+ ColumnDef columnDef = contenders.get(components[1]);
+
+ if (columnDef == null)
+ {
+ columnDef = new ColumnDef();
+ contenders.put(components[1], columnDef);
+ }
+
+ ColumnDef._Fields field = ColumnDef._Fields.findByName(components[2]);
+ columnDef.setFieldValue(field, deserializeValue(column.value(), getValueClass(ColumnDef.class, field.getFieldName())));
+ }
+
+ List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+
+ for (ColumnDef columnDef : contenders.values())
+ {
+ if (columnDef.isSetName() && columnDef.isSetValidation_class())
+ columnDefs.add(columnDef);
+ }
+
+ return columnDefs;
+ }
+
@Override
public String toString()
{
@@ -189,17 +326,4 @@ public class ColumnDefinition
return stringMap;
}
-
- private static Map<CharSequence, CharSequence> getCharSequenceMap(Map<String,String> stringMap)
- {
- if (stringMap == null)
- return null;
-
- Map<CharSequence, CharSequence> charMap = new HashMap<CharSequence, CharSequence>();
-
- for (Map.Entry<String, String> entry : stringMap.entrySet())
- charMap.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
-
- return charMap;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9f41a84..e301308 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -36,7 +36,9 @@ import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.IAuthority;
import org.apache.cassandra.cache.IRowCacheProvider;
import org.apache.cassandra.config.Config.RequestSchedulerId;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.FileUtils;
@@ -425,6 +427,9 @@ public class DatabaseDescriptor
Schema.instance.load(CFMetaData.IndexCf);
Schema.instance.load(CFMetaData.NodeIdCf);
Schema.instance.load(CFMetaData.VersionCf);
+ Schema.instance.load(CFMetaData.SchemaKeyspacesCf);
+ Schema.instance.load(CFMetaData.SchemaColumnFamiliesCf);
+ Schema.instance.load(CFMetaData.SchemaColumnsCf);
Schema.instance.addSystemTable(systemMeta);
@@ -471,61 +476,72 @@ public class DatabaseDescriptor
/** load keyspace (table) definitions, but do not initialize the table instances. */
public static void loadSchemas() throws IOException
{
- // we can load tables from local storage if a version is set in the system table and that acutally maps to
- // real data in the definitions table. If we do end up loading from xml, store the defintions so that we
- // don't load from xml anymore.
- UUID uuid = Migration.getLastMigrationId();
- if (uuid == null)
+ ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF);
+
+ // if table with definitions is empty try loading the old way
+ if (schemaCFS.estimateKeys() == 0)
{
- logger.info("Couldn't detect any schema definitions in local storage.");
- // peek around the data directories to see if anything is there.
- boolean hasExistingTables = false;
- for (String dataDir : getAllDataFileLocations())
+ // we can load tables from local storage if a version is set in the system table and that actually maps to
+ // real data in the definitions table. If we do end up loading from xml, store the definitions so that we
+ // don't load from xml anymore.
+ UUID uuid = Migration.getLastMigrationId();
+
+ if (uuid == null)
+ {
+ logger.info("Couldn't detect any schema definitions in local storage.");
+ // peek around the data directories to see if anything is there.
+ if (hasExistingNoSystemTables())
+ logger.info("Found table data in data directories. Consider using the CLI to define your schema.");
+ else
+ logger.info("To create keyspaces and column families, see 'help create keyspace' in the CLI, or set up a schema using the thrift system_* calls.");
+ }
+ else
{
- File dataPath = new File(dataDir);
- if (dataPath.exists() && dataPath.isDirectory())
+ logger.info("Loading schema version " + uuid.toString());
+ Collection<KSMetaData> tableDefs = DefsTable.loadFromStorage(uuid);
+
+ // happens when someone manually deletes all tables and restarts.
+ if (tableDefs.size() == 0)
{
- // see if there are other directories present.
- int dirCount = dataPath.listFiles(new FileFilter()
- {
- public boolean accept(File pathname)
- {
- return pathname.isDirectory();
- }
- }).length;
- if (dirCount > 0)
- hasExistingTables = true;
+ logger.warn("No schema definitions were found in local storage.");
}
- if (hasExistingTables)
+ else // if non-system tables where found, trying to load them
{
- break;
+ Schema.instance.load(tableDefs);
}
}
-
- if (hasExistingTables)
- logger.info("Found table data in data directories. Consider using the CLI to define your schema.");
- else
- logger.info("To create keyspaces and column families, see 'help create keyspace' in the CLI, or set up a schema using the thrift system_* calls.");
}
else
{
- logger.info("Loading schema version " + uuid.toString());
- Collection<KSMetaData> tableDefs = DefsTable.loadFromStorage(uuid);
+ Schema.instance.load(DefsTable.loadFromTable());
+ }
- // happens when someone manually deletes all tables and restarts.
- if (tableDefs.size() == 0)
- {
- logger.warn("No schema definitions were found in local storage.");
- // set version so that migrations leading up to emptiness aren't replayed.
- Schema.instance.setVersion(uuid);
- }
- else // if non-system tables where found, trying to load them
+ Schema.instance.updateVersion();
+ Schema.instance.fixCFMaxId();
+ }
+
+ private static boolean hasExistingNoSystemTables()
+ {
+ for (String dataDir : getAllDataFileLocations())
+ {
+ File dataPath = new File(dataDir);
+ if (dataPath.exists() && dataPath.isDirectory())
{
- Schema.instance.load(tableDefs, uuid);
+ // see if there are other directories present.
+ int dirCount = dataPath.listFiles(new FileFilter()
+ {
+ public boolean accept(File pathname)
+ {
+ return pathname.isDirectory();
+ }
+ }).length;
+
+ if (dirCount > 0)
+ return true;
}
}
- Schema.instance.fixCFMaxId();
+ return false;
}
public static IAuthenticator getAuthenticator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index a2b552d..2196156 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -18,19 +18,25 @@
package org.apache.cassandra.config;
+import java.io.IOException;
import java.util.*;
+import com.google.common.base.Objects;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
-import org.apache.avro.util.Utf8;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.ColumnDef;
+
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
public final class KSMetaData
{
@@ -65,7 +71,10 @@ public final class KSMetaData
CFMetaData.SchemaCf,
CFMetaData.IndexCf,
CFMetaData.NodeIdCf,
- CFMetaData.VersionCf);
+ CFMetaData.VersionCf,
+ CFMetaData.SchemaKeyspacesCf,
+ CFMetaData.SchemaColumnFamiliesCf,
+ CFMetaData.SchemaColumnsCf);
return new KSMetaData(Table.SYSTEM_TABLE, LocalStrategy.class, optsWithRF(1), true, cfDefs);
}
@@ -117,28 +126,6 @@ public final class KSMetaData
{
return cfMetaData;
}
-
- public org.apache.cassandra.db.migration.avro.KsDef toAvro()
- {
- org.apache.cassandra.db.migration.avro.KsDef ks = new org.apache.cassandra.db.migration.avro.KsDef();
- ks.name = new Utf8(name);
- ks.strategy_class = new Utf8(strategyClass.getName());
- if (strategyOptions != null)
- {
- ks.strategy_options = new HashMap<CharSequence, CharSequence>();
- for (Map.Entry<String, String> e : strategyOptions.entrySet())
- {
- ks.strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
- }
- }
- ks.cf_defs = SerDeUtils.createArray(cfMetaData.size(), org.apache.cassandra.db.migration.avro.CfDef.SCHEMA$);
- for (CFMetaData cfm : cfMetaData.values())
- ks.cf_defs.add(cfm.toAvro());
-
- ks.durable_writes = durableWrites;
-
- return ks;
- }
@Override
public String toString()
@@ -154,6 +141,7 @@ public final class KSMetaData
return sb.toString();
}
+ @Deprecated
public static KSMetaData fromAvro(org.apache.cassandra.db.migration.avro.KsDef ks)
{
Class<? extends AbstractReplicationStrategy> repStratClass;
@@ -230,4 +218,198 @@ public final class KSMetaData
return ksdef;
}
+
+ public RowMutation diff(KsDef newState, long modificationTimestamp)
+ {
+ KsDef curState = toThrift();
+ RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(name));
+
+ for (KsDef._Fields field : KsDef._Fields.values())
+ {
+ if (field.equals(KsDef._Fields.CF_DEFS))
+ continue;
+
+ Object curValue = curState.getFieldValue(field);
+ Object newValue = newState.getFieldValue(field);
+
+ if (Objects.equal(curValue, newValue))
+ continue;
+
+ m.add(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF, null, AsciiType.instance.fromString(field.getFieldName())),
+ valueAsBytes(newValue),
+ modificationTimestamp);
+ }
+
+ return m;
+ }
+
+ public KSMetaData reloadAttributes() throws IOException
+ {
+ Row ksDefRow = SystemTable.readSchemaRow(name);
+
+ if (ksDefRow.cf == null || ksDefRow.cf.isEmpty())
+ throw new IOException(String.format("%s not found in the schema definitions table (%s).", name, SystemTable.SCHEMA_KEYSPACES_CF));
+
+ return fromSchema(ksDefRow.cf, null);
+ }
+
+ public List<RowMutation> dropFromSchema(long timestamp)
+ {
+ List<RowMutation> mutations = new ArrayList<RowMutation>();
+
+ RowMutation ksMutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(name));
+ ksMutation.delete(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF), timestamp);
+ mutations.add(ksMutation);
+
+ for (CFMetaData cfm : cfMetaData.values())
+ mutations.add(cfm.dropFromSchema(timestamp));
+
+ return mutations;
+ }
+
+ public static RowMutation toSchema(KsDef ksDef, long timestamp) throws IOException
+ {
+ RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksDef.name));
+
+ for (KsDef._Fields field : KsDef._Fields.values())
+ {
+ if (field.equals(KsDef._Fields.CF_DEFS))
+ continue;
+
+ mutation.add(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF,
+ null,
+ AsciiType.instance.fromString(field.getFieldName())),
+ valueAsBytes(ksDef.getFieldValue(field)),
+ timestamp);
+ }
+
+ if (!ksDef.isSetCf_defs())
+ return mutation;
+
+ for (CfDef cf : ksDef.cf_defs)
+ {
+ try
+ {
+ CFMetaData.toSchema(mutation, cf, timestamp);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ return mutation;
+ }
+
+ public RowMutation toSchema(long timestamp) throws IOException
+ {
+ return toSchema(toThrift(), timestamp);
+ }
+
+ /**
+ * Deserialize only Keyspace attributes without nested ColumnFamilies
+ *
+ * @param serializedKsDef Keyspace attributes in serialized form
+ *
+ * @return deserialized keyspace without cf_defs
+ *
+ * @throws IOException if deserialization failed
+ */
+ public static KsDef fromSchema(ColumnFamily serializedKsDef) throws IOException
+ {
+ KsDef ksDef = new KsDef();
+
+ AbstractType comparator = serializedKsDef.getComparator();
+
+ for (IColumn ksAttr : serializedKsDef.getSortedColumns())
+ {
+ if (ksAttr == null || ksAttr.isMarkedForDelete())
+ continue;
+
+ KsDef._Fields field = KsDef._Fields.findByName(comparator.getString(ksAttr.name()));
+ ksDef.setFieldValue(field, deserializeValue(ksAttr.value(), getValueClass(KsDef.class, field.getFieldName())));
+ }
+
+ return ksDef.name == null ? null : ksDef;
+ }
+
+ /**
+ * Deserialize Keyspace with nested ColumnFamilies
+ *
+ * @param serializedKsDef Keyspace in serialized form
+ * @param serializedCFs Collection of the serialized ColumnFamilies
+ *
+ * @return deserialized keyspace with cf_defs
+ *
+ * @throws IOException if deserialization failed
+ */
+ public static KSMetaData fromSchema(ColumnFamily serializedKsDef, ColumnFamily serializedCFs) throws IOException
+ {
+ KsDef ksDef = fromSchema(serializedKsDef);
+
+ assert ksDef != null;
+
+ Map<String, CfDef> cfs = deserializeColumnFamilies(serializedCFs);
+
+ try
+ {
+ CFMetaData[] cfms = new CFMetaData[cfs.size()];
+
+ int index = 0;
+ for (CfDef cfDef : cfs.values())
+ cfms[index++] = CFMetaData.fromThrift(cfDef);
+
+ return fromThrift(ksDef, cfms);
+ }
+ catch (Exception e)
+ {
+ // this is critical because indicates that something is wrong with serialized schema
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Deserialize ColumnFamilies from low-level schema representation, all of them belong to the same keyspace
+ *
+ * @param serializedColumnFamilies ColumnFamilies in the serialized form
+ *
+ * @return map containing name of the ColumnFamily and it's metadata for faster lookup
+ */
+ public static Map<String, CfDef> deserializeColumnFamilies(ColumnFamily serializedColumnFamilies)
+ {
+ Map<String, CfDef> cfs = new HashMap<String, CfDef>();
+
+ if (serializedColumnFamilies == null)
+ return cfs;
+
+ AbstractType<?> comparator = serializedColumnFamilies.getComparator();
+
+ for (IColumn column : serializedColumnFamilies.getSortedColumns())
+ {
+ if (column == null || column.isMarkedForDelete())
+ continue;
+
+ String[] attr = comparator.getString(column.name()).split(":");
+ assert attr.length == 2;
+
+ CfDef cfDef = cfs.get(attr[0]);
+
+ if (cfDef == null)
+ {
+ cfDef = new CfDef();
+ cfs.put(attr[0], cfDef);
+ }
+
+ CfDef._Fields field = CfDef._Fields.findByName(attr[1]);
+ cfDef.setFieldValue(field, deserializeValue(column.value(), getValueClass(CfDef.class, field.getFieldName())));
+ }
+
+ for (CfDef cfDef : cfs.values())
+ {
+ for (ColumnDef columnDef : ColumnDefinition.fromSchema(cfDef.keyspace, cfDef.name))
+ cfDef.addToColumn_metadata(columnDef);
+ }
+
+ return cfs;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index cc977ab..0c8ced4 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -20,30 +20,33 @@ package org.apache.cassandra.config;
import java.io.IOError;
import java.nio.ByteBuffer;
+import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.Pair;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class Schema
{
private static final Logger logger = LoggerFactory.getLogger(Schema.class);
- public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero.
-
- public static final Schema instance = new Schema(INITIAL_VERSION);
+ public static final Schema instance = new Schema();
private static final int MIN_CF_ID = 1000;
private final AtomicInteger cfIdGen = new AtomicInteger(MIN_CF_ID);
@@ -58,50 +61,60 @@ public class Schema
private final BiMap<Pair<String, String>, Integer> cfIdMap = HashBiMap.create();
private volatile UUID version;
+ private final ReadWriteLock versionLock = new ReentrantReadWriteLock();
+
/**
- * Initialize empty schema object with given version
- * @param initialVersion The initial version of the schema
+ * Initialize empty schema object
*/
- public Schema(UUID initialVersion)
- {
- version = initialVersion;
- }
+ public Schema()
+ {}
/**
- * Load up non-system tables and set schema version to the given value
+ * Load up non-system tables
*
* @param tableDefs The non-system table definitions
- * @param version The version of the schema
*
* @return self to support chaining calls
*/
- public Schema load(Collection<KSMetaData> tableDefs, UUID version)
+ public Schema load(Collection<KSMetaData> tableDefs)
{
for (KSMetaData def : tableDefs)
+ load(def);
+
+ return this;
+ }
+
+ /**
+ * Load specific keyspace into Schema
+ *
+ * @param keyspaceDef The keyspace to load up
+ *
+ * @return self to support chaining calls
+ */
+ public Schema load(KSMetaData keyspaceDef)
+ {
+ if (!Migration.isLegalName(keyspaceDef.name))
+ throw new RuntimeException("invalid keyspace name: " + keyspaceDef.name);
+
+ for (CFMetaData cfm : keyspaceDef.cfMetaData().values())
{
- if (!Migration.isLegalName(def.name))
- throw new RuntimeException("invalid keyspace name: " + def.name);
+ if (!Migration.isLegalName(cfm.cfName))
+ throw new RuntimeException("invalid column family name: " + cfm.cfName);
- for (CFMetaData cfm : def.cfMetaData().values())
+ try
{
- if (!Migration.isLegalName(cfm.cfName))
- throw new RuntimeException("invalid column family name: " + cfm.cfName);
-
- try
- {
- load(cfm);
- }
- catch (ConfigurationException ex)
- {
- throw new IOError(ex);
- }
+ load(cfm);
+ }
+ catch (ConfigurationException ex)
+ {
+ throw new IOError(ex);
}
-
- setTableDefinition(def, version);
}
- setVersion(version);
+ setTableDefinition(keyspaceDef);
+
+ fixCFMaxId();
return this;
}
@@ -146,15 +159,13 @@ public class Schema
}
/**
- * Remove table definition from system and update schema version
+ * Remove table definition from system
*
* @param ksm The table definition to remove
- * @param newVersion New version of the system
*/
- public void clearTableDefinition(KSMetaData ksm, UUID newVersion)
+ public void clearTableDefinition(KSMetaData ksm)
{
tables.remove(ksm.name);
- version = newVersion;
}
/**
@@ -319,16 +330,14 @@ public class Schema
}
/**
- * Update (or insert) new table definition and change schema version
+ * Update (or insert) new table definition
*
* @param ksm The metadata about table
- * @param newVersion New schema version
*/
- public void setTableDefinition(KSMetaData ksm, UUID newVersion)
+ public void setTableDefinition(KSMetaData ksm)
{
if (ksm != null)
tables.put(ksm.name, ksm);
- version = newVersion;
}
/**
@@ -381,6 +390,8 @@ public class Schema
logger.debug("Adding {} to cfIdMap", cfm);
cfIdMap.put(key, cfm.cfId);
+
+ fixCFMaxId();
}
/**
@@ -417,15 +428,47 @@ public class Schema
*/
public UUID getVersion()
{
- return version;
+ versionLock.readLock().lock();
+
+ try
+ {
+ return version;
+ }
+ finally
+ {
+ versionLock.readLock().unlock();
+ }
}
/**
- * Set new version of the schema
- * @param newVersion New version of the schema
+ * Read schema from system table and calculate MD5 digest of every row, resulting digest
+ * will be converted into UUID which would act as content-based version of the schema.
*/
- public void setVersion(UUID newVersion)
+ public void updateVersion()
{
- version = newVersion;
+ versionLock.writeLock().lock();
+
+ try
+ {
+ MessageDigest versionDigest = MessageDigest.getInstance("MD5");
+
+ for (Row row : SystemTable.serializedSchema())
+ {
+ if (row.cf == null || row.cf.getColumnCount() == 0)
+ continue;
+
+ row.cf.updateDigest(versionDigest);
+ }
+
+ version = UUID.nameUUIDFromBytes(versionDigest.digest());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ versionLock.writeLock().unlock();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 6f10f50..73112e7 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -20,11 +20,10 @@
*/
package org.apache.cassandra.cql;
-import org.apache.avro.util.Utf8;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.db.migration.avro.CfDef;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import java.nio.ByteBuffer;
@@ -72,7 +71,7 @@ public class AlterTableStatement
{
CFMetaData meta = Schema.instance.getCFMetaData(keyspace, columnFamily);
- CfDef cfDef = meta.toAvro();
+ CfDef cfDef = meta.toThrift();
ByteBuffer columnName = this.oType == OperationType.OPTS ? null
: meta.comparator.fromString(this.columnName);
@@ -89,20 +88,27 @@ public class AlterTableStatement
TypeParser.parse(validator),
null,
null,
- null).toAvro());
+ null).toThrift());
break;
case ALTER:
- ColumnDefinition column = meta.getColumnDefinition(columnName);
+ ColumnDef toUpdate = null;
- if (column == null)
+ for (ColumnDef columnDef : cfDef.column_metadata)
+ {
+ if (columnDef.name.equals(columnName))
+ {
+ toUpdate = columnDef;
+ break;
+ }
+ }
+
+ if (toUpdate == null)
throw new InvalidRequestException(String.format("Column '%s' was not found in CF '%s'",
this.columnName,
columnFamily));
- column.setValidator(TypeParser.parse(validator));
-
- cfDef.column_metadata.add(column.toAvro());
+ toUpdate.setValidation_class(TypeParser.parse(validator).toString());
break;
case DROP:
@@ -121,9 +127,6 @@ public class AlterTableStatement
this.columnName,
columnFamily));
- // it is impossible to use ColumnDefinition.deflate() in remove() method
- // it will throw java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.util.Utf8
- // some where deep inside of Avro
cfDef.column_metadata.remove(toDelete);
break;
@@ -156,13 +159,13 @@ public class AlterTableStatement
}
if (cfProps.hasProperty(CFPropDefs.KW_COMMENT))
{
- cfDef.comment = new Utf8(cfProps.getProperty(CFPropDefs.KW_COMMENT));
+ cfDef.comment = cfProps.getProperty(CFPropDefs.KW_COMMENT);
}
if (cfProps.hasProperty(CFPropDefs.KW_DEFAULTVALIDATION))
{
try
{
- cfDef.default_validation_class = new Utf8(cfProps.getValidator().toString());
+ cfDef.default_validation_class = cfProps.getValidator().toString();
}
catch (ConfigurationException e)
{
@@ -179,20 +182,16 @@ public class AlterTableStatement
if (!cfProps.compactionStrategyOptions.isEmpty())
{
- cfDef.compaction_strategy_options = new HashMap<CharSequence, CharSequence>();
+ cfDef.compaction_strategy_options = new HashMap<String, String>();
for (Map.Entry<String, String> entry : cfProps.compactionStrategyOptions.entrySet())
- {
- cfDef.compaction_strategy_options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
- }
+ cfDef.compaction_strategy_options.put(entry.getKey(), entry.getValue());
}
if (!cfProps.compressionParameters.isEmpty())
{
- cfDef.compression_options = new HashMap<CharSequence, CharSequence>();
+ cfDef.compression_options = new HashMap<String, String>();
for (Map.Entry<String, String> entry : cfProps.compressionParameters.entrySet())
- {
- cfDef.compression_options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
- }
+ cfDef.compression_options.put(entry.getKey(), entry.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DropIndexStatement.java b/src/java/org/apache/cassandra/cql/DropIndexStatement.java
index d70e4d6..4d7aeee 100644
--- a/src/java/org/apache/cassandra/cql/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql/DropIndexStatement.java
@@ -22,20 +22,19 @@ package org.apache.cassandra.cql;
import java.io.IOException;
-import org.apache.avro.util.Utf8;
import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.migration.avro.CfDef;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
import org.apache.cassandra.db.migration.UpdateColumnFamily;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.InvalidRequestException;
public class DropIndexStatement
{
- public final CharSequence index;
+ public final String index;
public DropIndexStatement(String indexName)
{
- index = new Utf8(indexName);
+ index = indexName;
}
public UpdateColumnFamily generateMutation(String keyspace)
@@ -47,7 +46,7 @@ public class DropIndexStatement
for (CFMetaData cfm : ksm.cfMetaData().values())
{
- cfDef = getUpdatedCFDef(cfm.toAvro());
+ cfDef = getUpdatedCFDef(cfm.toThrift());
if (cfDef != null)
break;
}
@@ -62,10 +61,10 @@ public class DropIndexStatement
{
for (ColumnDef column : cfDef.column_metadata)
{
- if (column.index_type != null && column.index_name != null && column.index_name.equals(index))
+ if (column.isSetIndex_type() && column.isSetIndex_name() && column.index_name.equals(index))
{
- column.index_name = null;
- column.index_type = null;
+ column.unsetIndex_name();
+ column.unsetIndex_type();
return cfDef;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 0cf1efc..917fb92 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -731,12 +731,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
@@ -758,12 +752,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.toString());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
@@ -802,16 +790,7 @@ public class QueryProcessor
ThriftValidation.validateCfDef(cf_def, oldCfm);
try
{
- org.apache.cassandra.db.migration.avro.CfDef result1;
- try
- {
- result1 = CFMetaData.fromThrift(cf_def).toAvro();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- applyMigrationOnStage(new UpdateColumnFamily(result1));
+ applyMigrationOnStage(new UpdateColumnFamily(cf_def));
}
catch (ConfigurationException e)
{
@@ -819,12 +798,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.toString());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
@@ -870,12 +843,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
@@ -895,12 +862,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
@@ -922,12 +883,6 @@ public class QueryProcessor
ex.initCause(e);
throw ex;
}
- catch (IOException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
- }
result.type = CqlResultType.VOID;
return result;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 0c6c116..35c289e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -23,6 +23,8 @@ import static org.apache.cassandra.db.DBConstants.*;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.QueryPath;
@@ -30,6 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HeapAllocator;
@@ -255,13 +258,25 @@ public class ColumnFamily extends AbstractColumnContainer
@Override
public int hashCode()
{
- throw new RuntimeException("Not implemented.");
+ return new HashCodeBuilder(373, 75437)
+ .append(cfm)
+ .append(getMarkedForDeleteAt())
+ .append(columns).toHashCode();
}
@Override
public boolean equals(Object o)
{
- throw new RuntimeException("Not implemented.");
+ if (this == o)
+ return true;
+ if (o == null || this.getClass() != o.getClass())
+ return false;
+
+ ColumnFamily comparison = (ColumnFamily) o;
+
+ return cfm.equals(comparison.cfm)
+ && getMarkedForDeleteAt() == comparison.getMarkedForDeleteAt()
+ && ByteBufferUtil.compareUnsigned(digest(this), digest(comparison)) == 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index be3af14..6da1517 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -18,76 +18,35 @@
package org.apache.cassandra.db;
-import java.io.IOError;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.UUID;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
+/**
+ * Called when node receives updated schema state from the schema migration coordinator node.
+ * Such happens when user makes local schema migration on one of the nodes in the ring
+ * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
+ * (in form of row mutations) to all the alive nodes in the cluster.
+ */
public class DefinitionsUpdateVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
- /** someone sent me their data definitions */
public void doVerb(final Message message, String id)
{
- try
+ logger.debug("Received schema mutation push from " + message.getFrom());
+
+ StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
- // these are the serialized row mutations that I must apply.
- // check versions at every step along the way to make sure migrations are not applied out of order.
- Collection<Column> cols = MigrationManager.makeColumns(message);
- for (Column col : cols)
+ public void runMayThrow() throws Exception
{
- final UUID version = UUIDGen.getUUID(col.name());
- if (version.timestamp() > Schema.instance.getVersion().timestamp())
- {
- final Migration m = Migration.deserialize(col.value(), message.getVersion());
- assert m.getVersion().equals(version);
- StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- // check to make sure the current version is before this one.
- if (Schema.instance.getVersion().timestamp() == version.timestamp())
- logger.debug("Not appling (equal) " + version.toString());
- else if (Schema.instance.getVersion().timestamp() > version.timestamp())
- logger.debug("Not applying (before)" + version.toString());
- else
- {
- logger.debug("Applying {} from {}", m.getClass().getSimpleName(), message.getFrom());
- try
- {
- m.apply();
- // update gossip, but don't contact nodes directly
- m.passiveAnnounce();
- }
- catch (ConfigurationException ex)
- {
- // Trying to apply the same migration twice. This happens as a result of gossip.
- logger.debug("Migration not applied " + ex.getMessage());
- }
- }
- }
- });
- }
+ DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion());
}
- }
- catch (IOException ex)
- {
- throw new IOError(ex);
- }
+ });
}
-}
+}
\ No newline at end of file