You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/25 02:16:53 UTC

[1/4] git commit: Allow concurrent schema migrations patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-1391

Updated Branches:
  refs/heads/trunk e594e0dca -> 37b079352


Allow concurrent schema migrations
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-1391


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37b07935
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37b07935
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37b07935

Branch: refs/heads/trunk
Commit: 37b079352d412bb67036aa4130107728b9c8ae0d
Parents: e594e0d
Author: Pavel Yaskevich <po...@gmail.com>
Authored: Sun Jan 22 02:46:02 2012 +0200
Committer: Pavel Yaskevich <po...@gmail.com>
Committed: Wed Jan 25 04:12:36 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |  343 ++++++++++---
 .../apache/cassandra/config/ColumnDefinition.java  |  180 ++++++-
 .../cassandra/config/DatabaseDescriptor.java       |   96 ++--
 .../org/apache/cassandra/config/KSMetaData.java    |  234 ++++++++-
 src/java/org/apache/cassandra/config/Schema.java   |  137 ++++--
 .../apache/cassandra/cql/AlterTableStatement.java  |   45 +-
 .../apache/cassandra/cql/DropIndexStatement.java   |   17 +-
 .../org/apache/cassandra/cql/QueryProcessor.java   |   47 +--
 src/java/org/apache/cassandra/db/ColumnFamily.java |   19 +-
 .../cassandra/db/DefinitionsUpdateVerbHandler.java |   67 +--
 src/java/org/apache/cassandra/db/DefsTable.java    |  391 +++++++++++++--
 .../cassandra/db/MigrationRequestVerbHandler.java  |   53 ++
 src/java/org/apache/cassandra/db/SystemTable.java  |  118 +++++-
 .../cassandra/db/migration/AddColumnFamily.java    |   95 +---
 .../apache/cassandra/db/migration/AddKeyspace.java |   69 +--
 .../cassandra/db/migration/DropColumnFamily.java   |   89 +---
 .../cassandra/db/migration/DropKeyspace.java       |   61 +--
 .../apache/cassandra/db/migration/Migration.java   |  269 ++---------
 .../cassandra/db/migration/MigrationHelper.java    |  371 ++++++++++++++
 .../cassandra/db/migration/UpdateColumnFamily.java |   89 +---
 .../cassandra/db/migration/UpdateKeyspace.java     |   81 +---
 src/java/org/apache/cassandra/io/SerDeUtils.java   |  124 -----
 .../cassandra/service/AbstractCassandraDaemon.java |   14 -
 .../apache/cassandra/service/MigrationManager.java |  304 ++++++------
 .../apache/cassandra/service/StorageService.java   |   14 +-
 .../apache/cassandra/thrift/CassandraServer.java   |   72 +---
 .../org/apache/cassandra/utils/FBUtilities.java    |   27 +-
 .../serialization/0.7/db.migration.Keyspace1.bin   |    1 -
 .../serialization/0.7/db.migration.Keyspace2.bin   |    1 -
 .../serialization/0.7/db.migration.Keyspace3.bin   |  Bin 10498 -> 0 bytes
 .../serialization/0.7/db.migration.Keyspace4.bin   |    1 -
 .../serialization/0.7/db.migration.Keyspace5.bin   |    1 -
 test/unit/org/apache/cassandra/SchemaLoader.java   |    2 +-
 .../apache/cassandra/config/CFMetaDataTest.java    |   35 +-
 .../cassandra/config/ColumnDefinitionTest.java     |    2 +-
 .../cassandra/config/DatabaseDescriptorTest.java   |   26 +-
 test/unit/org/apache/cassandra/db/DefsTest.java    |  112 +----
 .../cassandra/db/migration/SerializationsTest.java |   77 ---
 39 files changed, 2085 insertions(+), 1600 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3e5af50..c7a228f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,7 @@
  * Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749)
  * Fix BulkLoader to support new SSTable layout and add stream
    throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752)
+ * Allow concurrent schema migrations (CASSANDRA-1391)
 
 
 1.0.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4c05e75..1afa04f 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -18,30 +18,38 @@
 
 package org.apache.cassandra.config;
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
-import org.apache.avro.util.Utf8;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
+
 public final class CFMetaData
 {
     //
@@ -63,11 +71,51 @@ public final class CFMetaData
 
     public static final CFMetaData StatusCf = newSystemMetadata(SystemTable.STATUS_CF, 0, "persistent metadata for the local node", BytesType.instance, null);
     public static final CFMetaData HintsCf = newSystemMetadata(HintedHandOffManager.HINTS_CF, 1, "hinted handoff data", BytesType.instance, BytesType.instance);
+    @Deprecated
     public static final CFMetaData MigrationsCf = newSystemMetadata(Migration.MIGRATIONS_CF, 2, "individual schema mutations", TimeUUIDType.instance, null);
+    @Deprecated
     public static final CFMetaData SchemaCf = newSystemMetadata(Migration.SCHEMA_CF, 3, "current state of the schema", UTF8Type.instance, null);
     public static final CFMetaData IndexCf = newSystemMetadata(SystemTable.INDEX_CF, 5, "indexes that have been completed", UTF8Type.instance, null);
     public static final CFMetaData NodeIdCf = newSystemMetadata(SystemTable.NODE_ID_CF, 6, "nodeId and their metadata", TimeUUIDType.instance, null);
     public static final CFMetaData VersionCf = newSystemMetadata(SystemTable.VERSION_CF, 7, "server version information", UTF8Type.instance, null);
+    public static final CFMetaData SchemaKeyspacesCf = schemaCFDefinition(SystemTable.SCHEMA_KEYSPACES_CF, 8, "keyspace attributes of the schema", AsciiType.instance, 1);
+    public static final CFMetaData SchemaColumnFamiliesCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNFAMILIES_CF, 9, "ColumnFamily attributes of the schema", AsciiType.instance, 2);
+    public static final CFMetaData SchemaColumnsCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNS_CF, 10, "ColumnFamily column attributes of the schema", AsciiType.instance, 3);
+
+    private static CFMetaData schemaCFDefinition(String name, int index, String comment, AbstractType<?> comp, int nestingLevel)
+    {
+        try
+        {
+            AbstractType<?> comparator;
+
+            if (nestingLevel == 1)
+            {
+                comparator = comp;
+            }
+            else
+            {
+                List<AbstractType<?>> composite = new ArrayList<AbstractType<?>>(nestingLevel);
+
+                for (int i = 0; i < nestingLevel; i++)
+                    composite.add(comp);
+
+                comparator = CompositeType.getInstance(composite);
+            }
+
+            return newSystemMetadata(name,
+                                     index,
+                                     comment,
+                                     comparator,
+                                     null)
+                                     .keyValidator(AsciiType.instance)
+                                     .defaultValidator(UTF8Type.instance);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     static
     {
         try
@@ -277,47 +325,7 @@ public final class CFMetaData
         return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
     }
 
-    // converts CFM to avro CfDef
-    public org.apache.cassandra.db.migration.avro.CfDef toAvro()
-    {
-        org.apache.cassandra.db.migration.avro.CfDef cf = new org.apache.cassandra.db.migration.avro.CfDef();
-        cf.id = cfId;
-        cf.keyspace = new Utf8(ksName);
-        cf.name = new Utf8(cfName);
-        cf.column_type = new Utf8(cfType.name());
-        cf.comparator_type = new Utf8(comparator.toString());
-        if (subcolumnComparator != null)
-        {
-            assert cfType == ColumnFamilyType.Super
-                   : String.format("%s CF %s should not have subcomparator %s defined", cfType, cfName, subcolumnComparator);
-            cf.subcomparator_type = new Utf8(subcolumnComparator.toString());
-        }
-        cf.comment = new Utf8(enforceCommentNotNull(comment));
-        cf.read_repair_chance = readRepairChance;
-        cf.replicate_on_write = replicateOnWrite;
-        cf.gc_grace_seconds = gcGraceSeconds;
-        cf.default_validation_class = defaultValidator == null ? null : new Utf8(defaultValidator.toString());
-        cf.key_validation_class = new Utf8(keyValidator.toString());
-        cf.min_compaction_threshold = minCompactionThreshold;
-        cf.max_compaction_threshold = maxCompactionThreshold;
-        cf.merge_shards_chance = mergeShardsChance;
-        cf.key_alias = keyAlias;
-        cf.column_metadata = new ArrayList<ColumnDef>(column_metadata.size());
-        for (ColumnDefinition cd : column_metadata.values())
-            cf.column_metadata.add(cd.toAvro());
-        cf.compaction_strategy = new Utf8(compactionStrategyClass.getName());
-        if (compactionStrategyOptions != null)
-        {
-            cf.compaction_strategy_options = new HashMap<CharSequence, CharSequence>();
-            for (Map.Entry<String, String> e : compactionStrategyOptions.entrySet())
-                cf.compaction_strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
-        }
-        cf.compression_options = compressionParameters.asAvroOptions();
-        cf.bloom_filter_fp_chance = bloomFilterFpChance;
-        cf.caching = new Utf8(caching.toString());
-        return cf;
-    }
-
+    @Deprecated
     public static CFMetaData fromAvro(org.apache.cassandra.db.migration.avro.CfDef cf)
     {
         AbstractType<?> comparator;
@@ -338,7 +346,7 @@ public final class CFMetaData
             throw new RuntimeException("Could not inflate CFMetaData for " + cf, ex);
         }
         Map<ByteBuffer, ColumnDefinition> column_metadata = new TreeMap<ByteBuffer, ColumnDefinition>(BytesType.instance);
-        for (ColumnDef aColumn_metadata : cf.column_metadata)
+        for (org.apache.cassandra.db.migration.avro.ColumnDef aColumn_metadata : cf.column_metadata)
         {
             ColumnDefinition cd = ColumnDefinition.fromAvro(aColumn_metadata);
             if (cd.getIndexType() != null && cd.getIndexName() == null)
@@ -627,22 +635,45 @@ public final class CFMetaData
                       .validate();
     }
 
-    /** updates CFMetaData in-place to match cf_def */
-    public void apply(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
+    public void reload() throws IOException
+    {
+        Row cfDefRow = SystemTable.readSchemaRow(ksName, cfName);
+
+        if (cfDefRow.cf == null || cfDefRow.cf.isEmpty())
+            throw new IOException(String.format("%s not found in the schema definitions table.", ksName + ":" + cfName));
+
+        try
+        {
+            apply(fromSchema(cfDefRow.cf));
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Updates CFMetaData in-place to match cf_def
+     *
+     * *Note*: This method left public only for DefsTest, don't use directly!
+     *
+     * @throws ConfigurationException if ks/cf names or cf ids didn't match
+     */
+    public void apply(CfDef cf_def) throws ConfigurationException
     {
         logger.debug("applying {} to {}", cf_def, this);
         // validate
-        if (!cf_def.keyspace.toString().equals(ksName))
+        if (!cf_def.keyspace.equals(ksName))
             throw new ConfigurationException(String.format("Keyspace mismatch (found %s; expected %s)",
                                                            cf_def.keyspace, ksName));
-        if (!cf_def.name.toString().equals(cfName))
+        if (!cf_def.name.equals(cfName))
             throw new ConfigurationException(String.format("Column family mismatch (found %s; expected %s)",
                                                            cf_def.name, cfName));
-        if (!cf_def.id.equals(cfId))
+        if (cf_def.id != cfId)
             throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)",
                                                            cf_def.id, cfId));
 
-        if (!cf_def.column_type.toString().equals(cfType.name()))
+        if (!cf_def.column_type.equals(cfType.name()))
             throw new ConfigurationException("types do not match.");
         if (comparator != TypeParser.parse(cf_def.comparator_type))
             throw new ConfigurationException("comparators do not match.");
@@ -667,15 +698,18 @@ public final class CFMetaData
         maxCompactionThreshold = cf_def.max_compaction_threshold;
         mergeShardsChance = cf_def.merge_shards_chance;
         keyAlias = cf_def.key_alias;
-        if (cf_def.bloom_filter_fp_chance != null)
+        if (cf_def.isSetBloom_filter_fp_chance())
             bloomFilterFpChance = cf_def.bloom_filter_fp_chance;
-        caching = Caching.fromString(cf_def.caching.toString());
+        caching = Caching.fromString(cf_def.caching);
+
+        if (!cf_def.isSetColumn_metadata())
+            cf_def.setColumn_metadata(new ArrayList<ColumnDef>());
 
         // adjust column definitions. figure out who is coming and going.
         Set<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
         Set<ByteBuffer> newColumns = new HashSet<ByteBuffer>();
-        Set<org.apache.cassandra.db.migration.avro.ColumnDef> toAdd = new HashSet<org.apache.cassandra.db.migration.avro.ColumnDef>();
-        for (org.apache.cassandra.db.migration.avro.ColumnDef def : cf_def.column_metadata)
+        Set<ColumnDef> toAdd = new HashSet<ColumnDef>();
+        for (ColumnDef def : cf_def.column_metadata)
         {
             newColumns.add(def.name);
             if (!column_metadata.containsKey(def.name))
@@ -691,36 +725,36 @@ public final class CFMetaData
             column_metadata.remove(indexName);
         }
         // update the ones staying
-        for (org.apache.cassandra.db.migration.avro.ColumnDef def : cf_def.column_metadata)
+        for (ColumnDef def : cf_def.column_metadata)
         {
             ColumnDefinition oldDef = column_metadata.get(def.name);
             if (oldDef == null)
                 continue;
             oldDef.setValidator(TypeParser.parse(def.validation_class));
-            oldDef.setIndexType(def.index_type == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(def.index_type.name()),
-                                ColumnDefinition.getStringMap(def.index_options));
-            oldDef.setIndexName(def.index_name == null ? null : def.index_name.toString());
+            oldDef.setIndexType(def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
+                                def.index_options);
+            oldDef.setIndexName(def.index_name == null ? null : def.index_name);
         }
         // add the new ones coming in.
-        for (org.apache.cassandra.db.migration.avro.ColumnDef def : toAdd)
+        for (ColumnDef def : toAdd)
         {
             AbstractType<?> dValidClass = TypeParser.parse(def.validation_class);
             ColumnDefinition cd = new ColumnDefinition(def.name, 
                                                        dValidClass,
-                                                       def.index_type == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(def.index_type.toString()), 
-                                                       ColumnDefinition.getStringMap(def.index_options),
-                                                       def.index_name == null ? null : def.index_name.toString());
+                                                       def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
+                                                       def.index_options,
+                                                       def.index_name == null ? null : def.index_name);
             column_metadata.put(cd.name, cd);
         }
 
         if (cf_def.compaction_strategy != null)
-            compactionStrategyClass = createCompactionStrategy(cf_def.compaction_strategy.toString());
+            compactionStrategyClass = createCompactionStrategy(cf_def.compaction_strategy);
 
         if (null != cf_def.compaction_strategy_options)
         {
             compactionStrategyOptions = new HashMap<String, String>();
-            for (Map.Entry<CharSequence, CharSequence> e : cf_def.compaction_strategy_options.entrySet())
-                compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
+            for (Map.Entry<String, String> e : cf_def.compaction_strategy_options.entrySet())
+                compactionStrategyOptions.put(e.getKey(), e.getValue());
         }
 
         compressionParameters = CompressionParameters.create(cf_def.compression_options);
@@ -749,9 +783,7 @@ public final class CFMetaData
                 ColumnFamilyStore.class,
                 Map.class // options
             });
-            return (AbstractCompactionStrategy)constructor.newInstance(new Object[] {
-                cfs,
-                compactionStrategyOptions});
+            return (AbstractCompactionStrategy)constructor.newInstance(cfs, compactionStrategyOptions);
         }
         catch (NoSuchMethodException e)
         {
@@ -788,7 +820,7 @@ public final class CFMetaData
         def.setRead_repair_chance(readRepairChance);
         def.setReplicate_on_write(replicateOnWrite);
         def.setGc_grace_seconds(gcGraceSeconds);
-        def.setDefault_validation_class(defaultValidator.toString());
+        def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
         def.setKey_validation_class(keyValidator.toString());
         def.setMin_compaction_threshold(minCompactionThreshold);
         def.setMax_compaction_threshold(maxCompactionThreshold);
@@ -815,9 +847,9 @@ public final class CFMetaData
         return def;
     }
 
-    public static void validateMinMaxCompactionThresholds(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
+    public static void validateMinMaxCompactionThresholds(CfDef cf_def) throws ConfigurationException
     {
-        if (cf_def.min_compaction_threshold != null && cf_def.max_compaction_threshold != null)
+        if (cf_def.isSetMin_compaction_threshold() && cf_def.isSetMax_compaction_threshold())
         {
             if ((cf_def.min_compaction_threshold > cf_def.max_compaction_threshold) &&
                     cf_def.max_compaction_threshold != 0)
@@ -825,15 +857,15 @@ public final class CFMetaData
                 throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold");
             }
         }
-        else if (cf_def.min_compaction_threshold != null)
+        else if (cf_def.isSetMin_compaction_threshold())
         {
             if (cf_def.min_compaction_threshold > DEFAULT_MAX_COMPACTION_THRESHOLD)
             {
-                throw new ConfigurationException("min_compaction_threshold cannot be greather than max_compaction_threshold (default " +
+                throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold (default " +
                                                   DEFAULT_MAX_COMPACTION_THRESHOLD + ")");
             }
         }
-        else if (cf_def.max_compaction_threshold != null)
+        else if (cf_def.isSetMax_compaction_threshold())
         {
             if (cf_def.max_compaction_threshold < DEFAULT_MIN_COMPACTION_THRESHOLD && cf_def.max_compaction_threshold != 0) {
                 throw new ConfigurationException("max_compaction_threshold cannot be less than min_compaction_threshold");
@@ -923,6 +955,167 @@ public final class CFMetaData
         return this;
     }
 
+    /**
+     * Calculate the difference between current metadata and given and serialize it as schema RowMutation
+     *
+     * @param newState The new metadata (for the same CF)
+     * @param modificationTimestamp Timestamp to use for mutation
+     *
+     * @return Difference between attributes in form of schema mutation
+     *
+     * @throws ConfigurationException if any of the attributes didn't pass validation
+     */
+    public RowMutation diff(CfDef newState, long modificationTimestamp) throws ConfigurationException
+    {
+        CfDef curState = toThrift();
+        RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+        for (CfDef._Fields field : CfDef._Fields.values())
+        {
+            if (field.equals(CfDef._Fields.COLUMN_METADATA))
+                continue; // deal with columns after main attributes
+
+            Object curValue = curState.getFieldValue(field);
+            Object newValue = newState.getFieldValue(field);
+
+            if (Objects.equal(curValue, newValue))
+                continue;
+
+            m.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(curState.name, field.getFieldName())),
+                  valueAsBytes(newValue),
+                  modificationTimestamp);
+        }
+
+        AbstractType nameComparator = cfType.equals(ColumnFamilyType.Super)
+                                        ? subcolumnComparator
+                                        : comparator;
+
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, ColumnDefinition.fromThrift(newState.column_metadata));
+        Map<ByteBuffer, ColumnDef> columnDefMap = ColumnDefinition.toMap(newState.column_metadata);
+
+        // columns that are no longer needed
+        for (ByteBuffer name : columnDiff.entriesOnlyOnLeft().keySet())
+            ColumnDefinition.deleteFromSchema(m, curState.name, nameComparator, name, modificationTimestamp);
+
+        // newly added columns
+        for (ByteBuffer name : columnDiff.entriesOnlyOnRight().keySet())
+            ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+
+        // old columns with updated attributes
+        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
+            ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+
+        return m;
+    }
+
+    /**
+     * Remove all CF attributes from schema
+     *
+     * @param timestamp Timestamp to use
+     *
+     * @return RowMutation to use to completely remove cf from schema
+     */
+    public RowMutation dropFromSchema(long timestamp)
+    {
+        RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+        for (CfDef._Fields field : CfDef._Fields.values())
+            m.delete(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfName, field.getFieldName())), timestamp);
+
+        for (ColumnDefinition columnDefinition : column_metadata.values())
+            ColumnDefinition.deleteFromSchema(m, cfName, comparator, columnDefinition.name, timestamp);
+
+        return m;
+    }
+
+    /**
+     * Convert current metadata into schema mutation
+     *
+     * @param timestamp Timestamp to use
+     *
+     * @return Low-level representation of the CF
+     *
+     * @throws ConfigurationException if any of the attributes didn't pass validation
+     */
+    public RowMutation toSchema(long timestamp) throws ConfigurationException
+    {
+        RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+
+        toSchema(mutation, toThrift(), timestamp);
+
+        return mutation;
+    }
+
+    /**
+     * Convert given Thrift-serialized metadata into schema mutation
+     *
+     * @param mutation The mutation to include ColumnFamily attributes into (can contain keyspace attributes already)
+     * @param cfDef Thrift-serialized metadata to use as source for schema mutation
+     * @param timestamp Timestamp to use
+     *
+     * @throws ConfigurationException if any of the attributes didn't pass validation
+     */
+    public static void toSchema(RowMutation mutation, CfDef cfDef, long timestamp) throws ConfigurationException
+    {
+        applyImplicitDefaults(cfDef);
+
+        for (CfDef._Fields field : CfDef._Fields.values())
+        {
+            if (field.equals(CfDef._Fields.COLUMN_METADATA))
+                continue;
+
+            mutation.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfDef.name, field.getFieldName())),
+                         valueAsBytes(cfDef.getFieldValue(field)),
+                         timestamp);
+        }
+
+        if (!cfDef.isSetColumn_metadata())
+            return;
+
+        AbstractType comparator = TypeParser.parse(cfDef.column_type.equals("Super")
+                                           ? cfDef.subcomparator_type
+                                           : cfDef.comparator_type);
+
+        for (ColumnDef columnDef : cfDef.column_metadata)
+            ColumnDefinition.addToSchema(mutation, cfDef.name, comparator, columnDef, timestamp);
+    }
+
+    /**
+     * Deserialize CF metadata from low-level representation
+     *
+     * @param serializedCfDef The data to use for deserialization
+     *
+     * @return Thrift-based metadata deserialized from schema
+     *
+     * @throws IOException on any I/O related error
+     */
+    public static CfDef fromSchema(ColumnFamily serializedCfDef) throws IOException
+    {
+        assert serializedCfDef != null;
+
+        CfDef cfDef = new CfDef();
+
+        AbstractType sysComparator = serializedCfDef.getComparator();
+
+        for (IColumn cfAttr : serializedCfDef.getSortedColumns())
+        {
+            if (cfAttr == null || cfAttr.isMarkedForDelete())
+                continue;
+
+            // column name format is <cf>:<attribute name>
+            String[] attr = sysComparator.getString(cfAttr.name()).split(":");
+            assert attr.length == 2;
+
+            CfDef._Fields field = CfDef._Fields.findByName(attr[1]);
+            cfDef.setFieldValue(field, deserializeValue(cfAttr.value(), getValueClass(CfDef.class, field.getFieldName())));
+        }
+
+        for (ColumnDef columnDef : ColumnDefinition.fromSchema(cfDef.keyspace, cfDef.name))
+            cfDef.addToColumn_metadata(columnDef);
+
+        return cfDef;
+    }
+
     @Override
     public String toString()
     {
@@ -951,4 +1144,4 @@ public final class CFMetaData
             .append("caching", caching)
             .toString();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 115bc2a..d8c9db9 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -24,16 +24,22 @@ package org.apache.cassandra.config;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.avro.util.Utf8;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.migration.MigrationHelper;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
+
 public class ColumnDefinition
 {
-    
     public final ByteBuffer name;
     private AbstractType<?> validator;
     private IndexType index_type;
@@ -80,19 +86,7 @@ public class ColumnDefinition
         return result;
     }
 
-    public org.apache.cassandra.db.migration.avro.ColumnDef toAvro()
-    {
-        org.apache.cassandra.db.migration.avro.ColumnDef cd = new org.apache.cassandra.db.migration.avro.ColumnDef();
-        cd.name = ByteBufferUtil.clone(name);
-        cd.validation_class = new Utf8(validator.toString());
-        cd.index_type = index_type == null
-                      ? null
-                      : org.apache.cassandra.db.migration.avro.IndexType.valueOf(index_type.name());
-        cd.index_name = index_name == null ? null : new Utf8(index_name);
-        cd.index_options = getCharSequenceMap(index_options);
-        return cd;
-    }
-
+    @Deprecated
     public static ColumnDefinition fromAvro(org.apache.cassandra.db.migration.avro.ColumnDef cd)
     {
         IndexType index_type = cd.index_type == null ? null : Enum.valueOf(IndexType.class, cd.index_type.name());
@@ -108,6 +102,22 @@ public class ColumnDefinition
         }
     }
 
+    public ColumnDef toThrift()
+    {
+        ColumnDef cd = new ColumnDef();
+
+        cd.setName(ByteBufferUtil.clone(name));
+        cd.setValidation_class(validator.toString());
+
+        cd.setIndex_type(index_type == null
+                            ? null
+                            : IndexType.valueOf(index_type.name()));
+        cd.setIndex_name(index_name == null ? null : index_name);
+        cd.setIndex_options(index_options == null ? null : Maps.newHashMap(index_options));
+
+        return cd;
+    }
+
     public static ColumnDefinition fromThrift(ColumnDef thriftColumnDef) throws ConfigurationException
     {
         return new ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name),
@@ -129,6 +139,133 @@ public class ColumnDefinition
         return cds;
     }
 
+    public static Map<ByteBuffer, ColumnDef> toMap(List<ColumnDef> columnDefs)
+    {
+        Map<ByteBuffer, ColumnDef> map = new HashMap<ByteBuffer, ColumnDef>();
+
+        if (columnDefs == null)
+            return map;
+
+        for (ColumnDef columnDef : columnDefs)
+            map.put(columnDef.name, columnDef);
+
+        return map;
+    }
+
+    /**
+     * Drop specified column from the schema using given row mutation.
+     *
+     * @param mutation   The schema row mutation
+     * @param cfName     The name of the parent ColumnFamily
+     * @param comparator The comparator to serialize column name in human-readable format
+     * @param columnName The column name as String
+     * @param timestamp  The timestamp to use for column modification
+     */
+    public static void deleteFromSchema(RowMutation mutation, String cfName, AbstractType comparator, ByteBuffer columnName, long timestamp)
+    {
+        toSchema(mutation, comparator, cfName, columnName, null, timestamp, true);
+    }
+
+    /**
+     * Add new/update column to/in the schema.
+     *
+     * @param mutation   The schema row mutation
+     * @param cfName     The name of the parent ColumnFamily
+     * @param comparator The comparator to serialize column name in human-readable format
+     * @param columnDef  The Thrift-based column definition that contains all attributes
+     * @param timestamp  The timestamp to use for column modification
+     */
+    public static void addToSchema(RowMutation mutation, String cfName, AbstractType comparator, ColumnDef columnDef, long timestamp)
+    {
+        toSchema(mutation, comparator, cfName, columnDef.name, columnDef, timestamp, false);
+    }
+
+    /**
+     * Serialize given ColumnDef into given schema row mutation to add or drop it.
+     *
+     * @param mutation   The mutation to use for serialization
+     * @param comparator The comparator to serialize column name in human-readable format
+     * @param cfName     The name of the parent ColumnFamily
+     * @param columnName The column name as String
+     * @param columnDef  The Thrift-based column definition that contains all attributes
+     * @param timestamp  The timestamp to use for column modification
+     * @param delete     The flag which indicates if column should be deleted or added to the schema
+     */
+    private static void toSchema(RowMutation mutation, AbstractType comparator, String cfName, ByteBuffer columnName, ColumnDef columnDef, long timestamp, boolean delete)
+    {
+        for (ColumnDef._Fields field : ColumnDef._Fields.values())
+        {
+            QueryPath path = new QueryPath(SystemTable.SCHEMA_COLUMNS_CF,
+                                           null,
+                                           compositeNameFor(cfName,
+                                                            readableColumnName(columnName, comparator),
+                                                            field.getFieldName()));
+
+            if (delete)
+                mutation.delete(path, timestamp);
+            else
+                mutation.add(path, valueAsBytes(columnDef.getFieldValue(field)), timestamp);
+        }
+    }
+
+    /**
+     * Deserialize columns from low-level representation
+     *
+     * @param ksName The corresponding Keyspace
+     * @param cfName The name of the parent ColumnFamily
+     *
+     * @return Thrift-based deserialized representation of the column
+     */
+    public static List<ColumnDef> fromSchema(String ksName, String cfName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(SystemTable.getSchemaKSKey(ksName));
+        ColumnFamilyStore columnsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF);
+        ColumnFamily columns = columnsStore.getColumnFamily(key,
+                                                            new QueryPath(SystemTable.SCHEMA_COLUMNS_CF),
+                                                            MigrationHelper.searchComposite(cfName, true),
+                                                            MigrationHelper.searchComposite(cfName, false),
+                                                            false,
+                                                            Integer.MAX_VALUE);
+
+        if (columns == null || columns.isEmpty())
+            return Collections.emptyList();
+
+        // contenders to be a valid columns, re-check is done after all attributes
+        // were read from serialized state, if ColumnDef has all required fields it gets promoted to be returned
+        Map<String, ColumnDef> contenders = new HashMap<String, ColumnDef>();
+
+        for (IColumn column : columns.getSortedColumns())
+        {
+            if (column.isMarkedForDelete())
+                continue;
+
+            // column name format <cf>:<column name>:<attribute name>
+            String[] components = columns.getComparator().getString(column.name()).split(":");
+            assert components.length == 3;
+
+            ColumnDef columnDef = contenders.get(components[1]);
+
+            if (columnDef == null)
+            {
+                columnDef = new ColumnDef();
+                contenders.put(components[1], columnDef);
+            }
+
+            ColumnDef._Fields field = ColumnDef._Fields.findByName(components[2]);
+            columnDef.setFieldValue(field, deserializeValue(column.value(), getValueClass(ColumnDef.class, field.getFieldName())));
+        }
+
+        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+
+        for (ColumnDef columnDef : contenders.values())
+        {
+            if (columnDef.isSetName() && columnDef.isSetValidation_class())
+                columnDefs.add(columnDef);
+        }
+
+        return columnDefs;
+    }
+
     @Override
     public String toString()
     {
@@ -189,17 +326,4 @@ public class ColumnDefinition
             
         return stringMap;
     }
-    
-    private static Map<CharSequence, CharSequence> getCharSequenceMap(Map<String,String> stringMap)
-    {
-        if (stringMap == null)
-            return null;
-        
-        Map<CharSequence, CharSequence> charMap = new HashMap<CharSequence, CharSequence>();
-        
-        for (Map.Entry<String, String> entry : stringMap.entrySet())
-            charMap.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
-        
-        return charMap;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9f41a84..e301308 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -36,7 +36,9 @@ import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthority;
 import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.config.Config.RequestSchedulerId;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
@@ -425,6 +427,9 @@ public class DatabaseDescriptor
             Schema.instance.load(CFMetaData.IndexCf);
             Schema.instance.load(CFMetaData.NodeIdCf);
             Schema.instance.load(CFMetaData.VersionCf);
+            Schema.instance.load(CFMetaData.SchemaKeyspacesCf);
+            Schema.instance.load(CFMetaData.SchemaColumnFamiliesCf);
+            Schema.instance.load(CFMetaData.SchemaColumnsCf);
 
             Schema.instance.addSystemTable(systemMeta);
 
@@ -471,61 +476,72 @@ public class DatabaseDescriptor
     /** load keyspace (table) definitions, but do not initialize the table instances. */
     public static void loadSchemas() throws IOException                         
     {
-        // we can load tables from local storage if a version is set in the system table and that acutally maps to
-        // real data in the definitions table.  If we do end up loading from xml, store the defintions so that we
-        // don't load from xml anymore.
-        UUID uuid = Migration.getLastMigrationId();
-        if (uuid == null)
+        ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF);
+
+        // if table with definitions is empty try loading the old way
+        if (schemaCFS.estimateKeys() == 0)
         {
-            logger.info("Couldn't detect any schema definitions in local storage.");
-            // peek around the data directories to see if anything is there.
-            boolean hasExistingTables = false;
-            for (String dataDir : getAllDataFileLocations())
+            // we can load tables from local storage if a version is set in the system table and that actually maps to
+            // real data in the definitions table.  If we do end up loading from xml, store the definitions so that we
+            // don't load from xml anymore.
+            UUID uuid = Migration.getLastMigrationId();
+
+            if (uuid == null)
+            {
+                logger.info("Couldn't detect any schema definitions in local storage.");
+                // peek around the data directories to see if anything is there.
+                if (hasExistingNoSystemTables())
+                    logger.info("Found table data in data directories. Consider using the CLI to define your schema.");
+                else
+                    logger.info("To create keyspaces and column families, see 'help create keyspace' in the CLI, or set up a schema using the thrift system_* calls.");
+            }
+            else
             {
-                File dataPath = new File(dataDir);
-                if (dataPath.exists() && dataPath.isDirectory())
+                logger.info("Loading schema version " + uuid.toString());
+                Collection<KSMetaData> tableDefs = DefsTable.loadFromStorage(uuid);
+
+                // happens when someone manually deletes all tables and restarts.
+                if (tableDefs.size() == 0)
                 {
-                    // see if there are other directories present.
-                    int dirCount = dataPath.listFiles(new FileFilter()
-                    {
-                        public boolean accept(File pathname)
-                        {
-                            return pathname.isDirectory();
-                        }
-                    }).length;
-                    if (dirCount > 0)
-                        hasExistingTables = true;
+                    logger.warn("No schema definitions were found in local storage.");
                 }
-                if (hasExistingTables)
+                else // if non-system tables where found, trying to load them
                 {
-                    break;
+                    Schema.instance.load(tableDefs);
                 }
             }
-            
-            if (hasExistingTables)
-                logger.info("Found table data in data directories. Consider using the CLI to define your schema.");
-            else
-                logger.info("To create keyspaces and column families, see 'help create keyspace' in the CLI, or set up a schema using the thrift system_* calls.");
         }
         else
         {
-            logger.info("Loading schema version " + uuid.toString());
-            Collection<KSMetaData> tableDefs = DefsTable.loadFromStorage(uuid);   
+            Schema.instance.load(DefsTable.loadFromTable());
+        }
 
-            // happens when someone manually deletes all tables and restarts.
-            if (tableDefs.size() == 0)
-            {
-                logger.warn("No schema definitions were found in local storage.");
-                // set version so that migrations leading up to emptiness aren't replayed.
-                Schema.instance.setVersion(uuid);
-            }
-            else // if non-system tables where found, trying to load them
+        Schema.instance.updateVersion();
+        Schema.instance.fixCFMaxId();
+    }
+
+    private static boolean hasExistingNoSystemTables()
+    {
+        for (String dataDir : getAllDataFileLocations())
+        {
+            File dataPath = new File(dataDir);
+            if (dataPath.exists() && dataPath.isDirectory())
             {
-                Schema.instance.load(tableDefs, uuid);
+                // see if there are other directories present.
+                int dirCount = dataPath.listFiles(new FileFilter()
+                {
+                    public boolean accept(File pathname)
+                    {
+                        return pathname.isDirectory();
+                    }
+                }).length;
+
+                if (dirCount > 0)
+                    return true;
             }
         }
 
-        Schema.instance.fixCFMaxId();
+        return false;
     }
 
     public static IAuthenticator getAuthenticator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index a2b552d..2196156 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -18,19 +18,25 @@
 
 package org.apache.cassandra.config;
 
+import java.io.IOException;
 import java.util.*;
 
+import com.google.common.base.Objects;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.avro.util.Utf8;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.ColumnDef;
+
+import static org.apache.cassandra.db.migration.MigrationHelper.*;
 
 public final class KSMetaData
 {
@@ -65,7 +71,10 @@ public final class KSMetaData
                                                 CFMetaData.SchemaCf,
                                                 CFMetaData.IndexCf,
                                                 CFMetaData.NodeIdCf,
-                                                CFMetaData.VersionCf);
+                                                CFMetaData.VersionCf,
+                                                CFMetaData.SchemaKeyspacesCf,
+                                                CFMetaData.SchemaColumnFamiliesCf,
+                                                CFMetaData.SchemaColumnsCf);
         return new KSMetaData(Table.SYSTEM_TABLE, LocalStrategy.class, optsWithRF(1), true, cfDefs);
     }
 
@@ -117,28 +126,6 @@ public final class KSMetaData
     {
         return cfMetaData;
     }
-        
-    public org.apache.cassandra.db.migration.avro.KsDef toAvro()
-    {
-        org.apache.cassandra.db.migration.avro.KsDef ks = new org.apache.cassandra.db.migration.avro.KsDef();
-        ks.name = new Utf8(name);
-        ks.strategy_class = new Utf8(strategyClass.getName());
-        if (strategyOptions != null)
-        {
-            ks.strategy_options = new HashMap<CharSequence, CharSequence>();
-            for (Map.Entry<String, String> e : strategyOptions.entrySet())
-            {
-                ks.strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
-            }
-        }
-        ks.cf_defs = SerDeUtils.createArray(cfMetaData.size(), org.apache.cassandra.db.migration.avro.CfDef.SCHEMA$);
-        for (CFMetaData cfm : cfMetaData.values())
-            ks.cf_defs.add(cfm.toAvro());
-        
-        ks.durable_writes = durableWrites;
-        
-        return ks;
-    }
 
     @Override
     public String toString()
@@ -154,6 +141,7 @@ public final class KSMetaData
         return sb.toString();
     }
 
+    @Deprecated
     public static KSMetaData fromAvro(org.apache.cassandra.db.migration.avro.KsDef ks)
     {
         Class<? extends AbstractReplicationStrategy> repStratClass;
@@ -230,4 +218,198 @@ public final class KSMetaData
 
         return ksdef;
     }
+
+    public RowMutation diff(KsDef newState, long modificationTimestamp)
+    {
+        KsDef curState = toThrift();
+        RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(name));
+
+        for (KsDef._Fields field : KsDef._Fields.values())
+        {
+            if (field.equals(KsDef._Fields.CF_DEFS))
+                continue;
+
+            Object curValue = curState.getFieldValue(field);
+            Object newValue = newState.getFieldValue(field);
+
+            if (Objects.equal(curValue, newValue))
+                continue;
+
+            m.add(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF, null, AsciiType.instance.fromString(field.getFieldName())),
+                  valueAsBytes(newValue),
+                  modificationTimestamp);
+        }
+
+        return m;
+    }
+
+    public KSMetaData reloadAttributes() throws IOException
+    {
+        Row ksDefRow = SystemTable.readSchemaRow(name);
+
+        if (ksDefRow.cf == null || ksDefRow.cf.isEmpty())
+            throw new IOException(String.format("%s not found in the schema definitions table (%s).", name, SystemTable.SCHEMA_KEYSPACES_CF));
+
+        return fromSchema(ksDefRow.cf, null);
+    }
+
+    public List<RowMutation> dropFromSchema(long timestamp)
+    {
+        List<RowMutation> mutations = new ArrayList<RowMutation>();
+
+        RowMutation ksMutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(name));
+        ksMutation.delete(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF), timestamp);
+        mutations.add(ksMutation);
+
+        for (CFMetaData cfm : cfMetaData.values())
+            mutations.add(cfm.dropFromSchema(timestamp));
+
+        return mutations;
+    }
+
+    public static RowMutation toSchema(KsDef ksDef, long timestamp) throws IOException
+    {
+        RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksDef.name));
+
+        for (KsDef._Fields field : KsDef._Fields.values())
+        {
+            if (field.equals(KsDef._Fields.CF_DEFS))
+                continue;
+
+            mutation.add(new QueryPath(SystemTable.SCHEMA_KEYSPACES_CF,
+                                       null,
+                                       AsciiType.instance.fromString(field.getFieldName())),
+                         valueAsBytes(ksDef.getFieldValue(field)),
+                         timestamp);
+        }
+
+        if (!ksDef.isSetCf_defs())
+            return mutation;
+
+        for (CfDef cf : ksDef.cf_defs)
+        {
+            try
+            {
+                CFMetaData.toSchema(mutation, cf, timestamp);
+            }
+            catch (ConfigurationException e)
+            {
+                throw new IOException(e);
+            }
+        }
+
+        return mutation;
+    }
+
+    public RowMutation toSchema(long timestamp) throws IOException
+    {
+        return toSchema(toThrift(), timestamp);
+    }
+
+    /**
+     * Deserialize only Keyspace attributes without nested ColumnFamilies
+     *
+     * @param serializedKsDef Keyspace attributes in serialized form
+     *
+     * @return deserialized keyspace without cf_defs
+     *
+     * @throws IOException if deserialization failed
+     */
+    public static KsDef fromSchema(ColumnFamily serializedKsDef) throws IOException
+    {
+        KsDef ksDef = new KsDef();
+
+        AbstractType comparator = serializedKsDef.getComparator();
+
+        for (IColumn ksAttr : serializedKsDef.getSortedColumns())
+        {
+            if (ksAttr == null || ksAttr.isMarkedForDelete())
+                continue;
+
+            KsDef._Fields field = KsDef._Fields.findByName(comparator.getString(ksAttr.name()));
+            ksDef.setFieldValue(field, deserializeValue(ksAttr.value(), getValueClass(KsDef.class, field.getFieldName())));
+        }
+
+        return ksDef.name == null ? null : ksDef;
+    }
+
+    /**
+     * Deserialize Keyspace with nested ColumnFamilies
+     *
+     * @param serializedKsDef Keyspace in serialized form
+     * @param serializedCFs Collection of the serialized ColumnFamilies
+     *
+     * @return deserialized keyspace with cf_defs
+     *
+     * @throws IOException if deserialization failed
+     */
+    public static KSMetaData fromSchema(ColumnFamily serializedKsDef, ColumnFamily serializedCFs) throws IOException
+    {
+        KsDef ksDef = fromSchema(serializedKsDef);
+
+        assert ksDef != null;
+
+        Map<String, CfDef> cfs = deserializeColumnFamilies(serializedCFs);
+
+        try
+        {
+            CFMetaData[] cfms = new CFMetaData[cfs.size()];
+
+            int index = 0;
+            for (CfDef cfDef : cfs.values())
+                cfms[index++] = CFMetaData.fromThrift(cfDef);
+
+            return fromThrift(ksDef, cfms);
+        }
+        catch (Exception e)
+        {
+            // this is critical because indicates that something is wrong with serialized schema
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Deserialize ColumnFamilies from low-level schema representation, all of them belong to the same keyspace
+     *
+     * @param serializedColumnFamilies ColumnFamilies in the serialized form
+     *
+     * @return map containing name of the ColumnFamily and it's metadata for faster lookup
+     */
+    public static Map<String, CfDef> deserializeColumnFamilies(ColumnFamily serializedColumnFamilies)
+    {
+        Map<String, CfDef> cfs = new HashMap<String, CfDef>();
+
+        if (serializedColumnFamilies == null)
+            return cfs;
+
+        AbstractType<?> comparator = serializedColumnFamilies.getComparator();
+
+        for (IColumn column : serializedColumnFamilies.getSortedColumns())
+        {
+            if (column == null || column.isMarkedForDelete())
+                continue;
+
+            String[] attr = comparator.getString(column.name()).split(":");
+            assert attr.length == 2;
+
+            CfDef cfDef = cfs.get(attr[0]);
+
+            if (cfDef == null)
+            {
+                cfDef = new CfDef();
+                cfs.put(attr[0], cfDef);
+            }
+
+            CfDef._Fields field = CfDef._Fields.findByName(attr[1]);
+            cfDef.setFieldValue(field, deserializeValue(column.value(), getValueClass(CfDef.class, field.getFieldName())));
+        }
+
+        for (CfDef cfDef : cfs.values())
+        {
+            for (ColumnDef columnDef : ColumnDefinition.fromSchema(cfDef.keyspace, cfDef.name))
+                cfDef.addToColumn_metadata(columnDef);
+        }
+
+        return cfs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index cc977ab..0c8ced4 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -20,30 +20,33 @@ package org.apache.cassandra.config;
 
 import java.io.IOError;
 import java.nio.ByteBuffer;
+import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.Pair;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class Schema
 {
     private static final Logger logger = LoggerFactory.getLogger(Schema.class);
 
-    public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero.
-
-    public static final Schema instance = new Schema(INITIAL_VERSION);
+    public static final Schema instance = new Schema();
 
     private static final int MIN_CF_ID = 1000;
     private final AtomicInteger cfIdGen = new AtomicInteger(MIN_CF_ID);
@@ -58,50 +61,60 @@ public class Schema
     private final BiMap<Pair<String, String>, Integer> cfIdMap = HashBiMap.create();
 
     private volatile UUID version;
+    private final ReadWriteLock versionLock = new ReentrantReadWriteLock();
+
 
     /**
-     * Initialize empty schema object with given version
-     * @param initialVersion The initial version of the schema
+     * Initialize empty schema object
      */
-    public Schema(UUID initialVersion)
-    {
-        version = initialVersion;
-    }
+    public Schema()
+    {}
 
     /**
-     * Load up non-system tables and set schema version to the given value
+     * Load up non-system tables
      *
      * @param tableDefs The non-system table definitions
-     * @param version The version of the schema
      *
      * @return self to support chaining calls
      */
-    public Schema load(Collection<KSMetaData> tableDefs, UUID version)
+    public Schema load(Collection<KSMetaData> tableDefs)
     {
         for (KSMetaData def : tableDefs)
+            load(def);
+
+        return this;
+    }
+
+    /**
+     * Load specific keyspace into Schema
+     *
+     * @param keyspaceDef The keyspace to load up
+     *
+     * @return self to support chaining calls
+     */
+    public Schema load(KSMetaData keyspaceDef)
+    {
+        if (!Migration.isLegalName(keyspaceDef.name))
+            throw new RuntimeException("invalid keyspace name: " + keyspaceDef.name);
+
+        for (CFMetaData cfm : keyspaceDef.cfMetaData().values())
         {
-            if (!Migration.isLegalName(def.name))
-                throw new RuntimeException("invalid keyspace name: " + def.name);
+            if (!Migration.isLegalName(cfm.cfName))
+                throw new RuntimeException("invalid column family name: " + cfm.cfName);
 
-            for (CFMetaData cfm : def.cfMetaData().values())
+            try
             {
-                if (!Migration.isLegalName(cfm.cfName))
-                    throw new RuntimeException("invalid column family name: " + cfm.cfName);
-
-                try
-                {
-                    load(cfm);
-                }
-                catch (ConfigurationException ex)
-                {
-                    throw new IOError(ex);
-                }
+                load(cfm);
+            }
+            catch (ConfigurationException ex)
+            {
+                throw new IOError(ex);
             }
-
-            setTableDefinition(def, version);
         }
 
-        setVersion(version);
+        setTableDefinition(keyspaceDef);
+
+        fixCFMaxId();
 
         return this;
     }
@@ -146,15 +159,13 @@ public class Schema
     }
 
     /**
-     * Remove table definition from system and update schema version
+     * Remove table definition from system
      *
      * @param ksm The table definition to remove
-     * @param newVersion New version of the system
      */
-    public void clearTableDefinition(KSMetaData ksm, UUID newVersion)
+    public void clearTableDefinition(KSMetaData ksm)
     {
         tables.remove(ksm.name);
-        version = newVersion;
     }
 
     /**
@@ -319,16 +330,14 @@ public class Schema
     }
 
     /**
-     * Update (or insert) new table definition and change schema version
+     * Update (or insert) new table definition
      *
      * @param ksm The metadata about table
-     * @param newVersion New schema version
      */
-    public void setTableDefinition(KSMetaData ksm, UUID newVersion)
+    public void setTableDefinition(KSMetaData ksm)
     {
         if (ksm != null)
             tables.put(ksm.name, ksm);
-        version = newVersion;
     }
 
     /**
@@ -381,6 +390,8 @@ public class Schema
 
         logger.debug("Adding {} to cfIdMap", cfm);
         cfIdMap.put(key, cfm.cfId);
+
+        fixCFMaxId();
     }
 
     /**
@@ -417,15 +428,47 @@ public class Schema
      */
     public UUID getVersion()
     {
-        return version;
+        versionLock.readLock().lock();
+
+        try
+        {
+            return version;
+        }
+        finally
+        {
+            versionLock.readLock().unlock();
+        }
     }
 
     /**
-     * Set new version of the schema
-     * @param newVersion New version of the schema
+     * Read schema from system table and calculate MD5 digest of every row, resulting digest
+     * will be converted into UUID which would act as content-based version of the schema.
      */
-    public void setVersion(UUID newVersion)
+    public void updateVersion()
     {
-        version = newVersion;
+        versionLock.writeLock().lock();
+
+        try
+        {
+            MessageDigest versionDigest = MessageDigest.getInstance("MD5");
+
+            for (Row row : SystemTable.serializedSchema())
+            {
+                if (row.cf == null || row.cf.getColumnCount() == 0)
+                    continue;
+
+                row.cf.updateDigest(versionDigest);
+            }
+
+            version = UUID.nameUUIDFromBytes(versionDigest.digest());
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            versionLock.writeLock().unlock();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 6f10f50..73112e7 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -20,11 +20,10 @@
  */
 package org.apache.cassandra.cql;
 
-import org.apache.avro.util.Utf8;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.db.migration.avro.CfDef;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
 import java.nio.ByteBuffer;
@@ -72,7 +71,7 @@ public class AlterTableStatement
     {
         CFMetaData meta = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
-        CfDef cfDef = meta.toAvro();
+        CfDef cfDef = meta.toThrift();
 
         ByteBuffer columnName = this.oType == OperationType.OPTS ? null
                                                                  : meta.comparator.fromString(this.columnName);
@@ -89,20 +88,27 @@ public class AlterTableStatement
                                                                TypeParser.parse(validator),
                                                                null,
                                                                null,
-                                                               null).toAvro());
+                                                               null).toThrift());
                 break;
 
             case ALTER:
-                ColumnDefinition column = meta.getColumnDefinition(columnName);
+                ColumnDef toUpdate = null;
 
-                if (column == null)
+                for (ColumnDef columnDef : cfDef.column_metadata)
+                {
+                    if (columnDef.name.equals(columnName))
+                    {
+                        toUpdate = columnDef;
+                        break;
+                    }
+                }
+
+                if (toUpdate == null)
                     throw new InvalidRequestException(String.format("Column '%s' was not found in CF '%s'",
                                                                     this.columnName,
                                                                     columnFamily));
 
-                column.setValidator(TypeParser.parse(validator));
-
-                cfDef.column_metadata.add(column.toAvro());
+                toUpdate.setValidation_class(TypeParser.parse(validator).toString());
                 break;
 
             case DROP:
@@ -121,9 +127,6 @@ public class AlterTableStatement
                                                                     this.columnName,
                                                                     columnFamily));
 
-                // it is impossible to use ColumnDefinition.deflate() in remove() method
-                // it will throw java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.util.Utf8
-                // some where deep inside of Avro
                 cfDef.column_metadata.remove(toDelete);
                 break;
 
@@ -156,13 +159,13 @@ public class AlterTableStatement
         }
         if (cfProps.hasProperty(CFPropDefs.KW_COMMENT))
         {
-            cfDef.comment = new Utf8(cfProps.getProperty(CFPropDefs.KW_COMMENT));
+            cfDef.comment = cfProps.getProperty(CFPropDefs.KW_COMMENT);
         }
         if (cfProps.hasProperty(CFPropDefs.KW_DEFAULTVALIDATION))
         {
             try
             {
-                cfDef.default_validation_class = new Utf8(cfProps.getValidator().toString());
+                cfDef.default_validation_class = cfProps.getValidator().toString();
             }
             catch (ConfigurationException e)
             {
@@ -179,20 +182,16 @@ public class AlterTableStatement
 
         if (!cfProps.compactionStrategyOptions.isEmpty())
         {
-            cfDef.compaction_strategy_options = new HashMap<CharSequence, CharSequence>();
+            cfDef.compaction_strategy_options = new HashMap<String, String>();
             for (Map.Entry<String, String> entry : cfProps.compactionStrategyOptions.entrySet())
-            {
-                cfDef.compaction_strategy_options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
-            }
+                cfDef.compaction_strategy_options.put(entry.getKey(), entry.getValue());
         }
 
         if (!cfProps.compressionParameters.isEmpty())
         {
-            cfDef.compression_options = new HashMap<CharSequence, CharSequence>();
+            cfDef.compression_options = new HashMap<String, String>();
             for (Map.Entry<String, String> entry : cfProps.compressionParameters.entrySet())
-            {
-                cfDef.compression_options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
-            }
+                cfDef.compression_options.put(entry.getKey(), entry.getValue());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DropIndexStatement.java b/src/java/org/apache/cassandra/cql/DropIndexStatement.java
index d70e4d6..4d7aeee 100644
--- a/src/java/org/apache/cassandra/cql/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql/DropIndexStatement.java
@@ -22,20 +22,19 @@ package org.apache.cassandra.cql;
 
 import java.io.IOException;
 
-import org.apache.avro.util.Utf8;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.migration.avro.CfDef;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
 import org.apache.cassandra.db.migration.UpdateColumnFamily;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
 public class DropIndexStatement
 {
-    public final CharSequence index;
+    public final String index;
 
     public DropIndexStatement(String indexName)
     {
-        index = new Utf8(indexName);
+        index = indexName;
     }
 
     public UpdateColumnFamily generateMutation(String keyspace)
@@ -47,7 +46,7 @@ public class DropIndexStatement
 
         for (CFMetaData cfm : ksm.cfMetaData().values())
         {
-            cfDef = getUpdatedCFDef(cfm.toAvro());
+            cfDef = getUpdatedCFDef(cfm.toThrift());
             if (cfDef != null)
                 break;
         }
@@ -62,10 +61,10 @@ public class DropIndexStatement
     {
         for (ColumnDef column : cfDef.column_metadata)
         {
-            if (column.index_type != null && column.index_name != null && column.index_name.equals(index))
+            if (column.isSetIndex_type() && column.isSetIndex_name() && column.index_name.equals(index))
             {
-                column.index_name = null;
-                column.index_type = null;
+                column.unsetIndex_name();
+                column.unsetIndex_type();
                 return cfDef;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 0cf1efc..917fb92 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -731,12 +731,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
-                    ex.initCause(e);
-                    throw ex;
-                }
                 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -758,12 +752,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.toString());
-                    ex.initCause(e);
-                    throw ex;
-                }
                 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -802,16 +790,7 @@ public class QueryProcessor
                 ThriftValidation.validateCfDef(cf_def, oldCfm);
                 try
                 {
-                    org.apache.cassandra.db.migration.avro.CfDef result1;
-                    try
-                    {
-                        result1 = CFMetaData.fromThrift(cf_def).toAvro();
-                    }
-                    catch (Exception e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                    applyMigrationOnStage(new UpdateColumnFamily(result1));
+                    applyMigrationOnStage(new UpdateColumnFamily(cf_def));
                 }
                 catch (ConfigurationException e)
                 {
@@ -819,12 +798,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.toString());
-                    ex.initCause(e);
-                    throw ex;
-                }
                 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -870,12 +843,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
-                    ex.initCause(e);
-                    throw ex;
-                }
                 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -895,12 +862,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
-                    ex.initCause(e);
-                    throw ex;
-                }
                 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -922,12 +883,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.getMessage());
-                    ex.initCause(e);
-                    throw ex;
-                }
 
                 result.type = CqlResultType.VOID;
                 return result;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 0c6c116..35c289e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -23,6 +23,8 @@ import static org.apache.cassandra.db.DBConstants.*;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -30,6 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.HeapAllocator;
 
@@ -255,13 +258,25 @@ public class ColumnFamily extends AbstractColumnContainer
     @Override
     public int hashCode()
     {
-        throw new RuntimeException("Not implemented.");
+        return new HashCodeBuilder(373, 75437)
+                    .append(cfm)
+                    .append(getMarkedForDeleteAt())
+                    .append(columns).toHashCode();
     }
 
     @Override
     public boolean equals(Object o)
     {
-        throw new RuntimeException("Not implemented.");
+        if (this == o)
+            return true;
+        if (o == null || this.getClass() != o.getClass())
+            return false;
+
+        ColumnFamily comparison = (ColumnFamily) o;
+
+        return cfm.equals(comparison.cfm)
+                && getMarkedForDeleteAt() == comparison.getMarkedForDeleteAt()
+                && ByteBufferUtil.compareUnsigned(digest(this), digest(comparison)) == 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index be3af14..6da1517 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -18,76 +18,35 @@
 
 package org.apache.cassandra.db;
 
-import java.io.IOError;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.UUID;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+/**
+ * Called when node receives updated schema state from the schema migration coordinator node.
+ * Such happens when user makes local schema migration on one of the nodes in the ring
+ * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
+ * (in form of row mutations) to all the alive nodes in the cluster.
+ */
 public class DefinitionsUpdateVerbHandler implements IVerbHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
 
-    /** someone sent me their data definitions */
     public void doVerb(final Message message, String id)
     {
-        try
+        logger.debug("Received schema mutation push from " + message.getFrom());
+
+        StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
         {
-            // these are the serialized row mutations that I must apply.
-            // check versions at every step along the way to make sure migrations are not applied out of order.
-            Collection<Column> cols = MigrationManager.makeColumns(message);
-            for (Column col : cols)
+            public void runMayThrow() throws Exception
             {
-                final UUID version = UUIDGen.getUUID(col.name());
-                if (version.timestamp() > Schema.instance.getVersion().timestamp())
-                {
-                    final Migration m = Migration.deserialize(col.value(), message.getVersion());
-                    assert m.getVersion().equals(version);
-                    StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
-                    {
-                        protected void runMayThrow() throws Exception
-                        {
-                            // check to make sure the current version is before this one.
-                            if (Schema.instance.getVersion().timestamp() == version.timestamp())
-                                logger.debug("Not appling (equal) " + version.toString());
-                            else if (Schema.instance.getVersion().timestamp() > version.timestamp())
-                                logger.debug("Not applying (before)" + version.toString());
-                            else
-                            {
-                                logger.debug("Applying {} from {}", m.getClass().getSimpleName(), message.getFrom());
-                                try
-                                {
-                                    m.apply();
-                                    // update gossip, but don't contact nodes directly
-                                    m.passiveAnnounce();
-                                }
-                                catch (ConfigurationException ex)
-                                {
-                                    // Trying to apply the same migration twice. This happens as a result of gossip.
-                                    logger.debug("Migration not applied " + ex.getMessage());
-                                }
-                            }
-                        }
-                    });
-                }
+                DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion());
             }
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
+        });
     }
-}
+}
\ No newline at end of file