You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/25 02:16:53 UTC

[2/4] Allow concurrent schema migrations patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-1391

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 3d7e062..ca62a46 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -18,85 +18,384 @@
 
 package org.apache.cassandra.db;
 
-import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.db.migration.MigrationHelper;
+import org.apache.cassandra.db.migration.avro.KsDef;
+import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
 
+/**
+ * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
+ * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations
+ * and used for schema distribution.
+ *
+ * SCHEMA_KEYSPACES_CF layout:
+ *
+ * <key (AsciiType)>
+ *   ascii => json_serialized_value
+ *   ...
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks".
+ *
+ * SCHEMA_COLUMNFAMILIES_CF layout:
+ *
+ * <key (AsciiType)>
+ *     composite(ascii, ascii) => json_serialized_value
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks"., first component of the column name is name of the ColumnFamily, last
+ * component is the name of the ColumnFamily attribute.
+ *
+ * SCHEMA_COLUMNS_CF layout:
+ *
+ * <key (AsciiType)>
+ *     composite(ascii, ascii, ascii) => json_serialized value
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks".
+ *
+ * Column names where made composite to support 3-level nesting which represents following structure:
+ * "ColumnFamily name":"column name":"column attribute" => "value"
+ *
+ * Example of schema (using CLI):
+ *
+ * schema_keyspaces
+ * ----------------
+ * RowKey: ks
+ *  => (column=durable_writes, value=true, timestamp=1327061028312185000)
+ *  => (column=name, value="ks", timestamp=1327061028312185000)
+ *  => (column=replication_factor, value=0, timestamp=1327061028312185000)
+ *  => (column=strategy_class, value="org.apache.cassandra.locator.NetworkTopologyStrategy", timestamp=1327061028312185000)
+ *  => (column=strategy_options, value={"datacenter1":"1"}, timestamp=1327061028312185000)
+ *
+ * schema_columnfamilies
+ * ---------------------
+ * RowKey: ks
+ *  => (column=cf:bloom_filter_fp_chance, value=0.0, timestamp=1327061105833119000)
+ *  => (column=cf:caching, value="NONE", timestamp=1327061105833119000)
+ *  => (column=cf:column_type, value="Standard", timestamp=1327061105833119000)
+ *  => (column=cf:comment, value="ColumnFamily", timestamp=1327061105833119000)
+ *  => (column=cf:default_validation_class, value="org.apache.cassandra.db.marshal.BytesType", timestamp=1327061105833119000)
+ *  => (column=cf:gc_grace_seconds, value=864000, timestamp=1327061105833119000)
+ *  => (column=cf:id, value=1000, timestamp=1327061105833119000)
+ *  => (column=cf:key_alias, value="S0VZ", timestamp=1327061105833119000)
+ *  ... part of the output omitted.
+ *
+ * schema_columns
+ * --------------
+ * RowKey: ks
+ *  => (column=cf:c:index_name, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:index_options, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:index_type, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:name, value="aGVsbG8=", timestamp=1327061105833119000)
+ *  => (column=cf:c:validation_class, value="org.apache.cassandra.db.marshal.AsciiType", timestamp=1327061105833119000)
+ */
 public class DefsTable
 {
+    private final static Logger logger = LoggerFactory.getLogger(DefsTable.class);
+
+    // unbuffered decoders
+    private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
+
     // column name for the schema storing serialized keyspace definitions
     // NB: must be an invalid keyspace name
     public static final ByteBuffer DEFINITION_SCHEMA_COLUMN_NAME = ByteBufferUtil.bytes("Avro/Schema");
 
-    /** dumps current keyspace definitions to storage */
-    public static synchronized void dumpToStorage(UUID version) throws IOException
+    /* dumps current keyspace definitions to storage */
+    public static synchronized void dumpToStorage(Collection<KSMetaData> keyspaces) throws IOException
+    {
+        long timestamp = System.currentTimeMillis();
+
+        for (KSMetaData ksMetaData : keyspaces)
+            ksMetaData.toSchema(timestamp).apply();
+    }
+
+    /**
+     * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF)
+     *
+     * @return Collection of found keyspace definitions
+     *
+     * @throws IOException if failed to read SCHEMA_KEYSPACES_CF
+     */
+    public static Collection<KSMetaData> loadFromTable() throws IOException
     {
-        final ByteBuffer versionKey = Migration.toUTF8Bytes(version);
+        List<Row> serializedSchema = SystemTable.serializedSchema(SystemTable.SCHEMA_KEYSPACES_CF);
 
-        // build a list of keyspaces
-        Collection<String> ksnames = org.apache.cassandra.config.Schema.instance.getNonSystemTables();
+        List<KSMetaData> keyspaces = new ArrayList<KSMetaData>();
 
-        // persist keyspaces under new version
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, versionKey);
-        long now = System.currentTimeMillis();
-        for (String ksname : ksnames)
+        for (Row row : serializedSchema)
         {
-            KSMetaData ksm = org.apache.cassandra.config.Schema.instance.getTableDefinition(ksname);
-            rm.add(new QueryPath(Migration.SCHEMA_CF, null, ByteBufferUtil.bytes(ksm.name)), SerDeUtils.serialize(ksm.toAvro()), now);
+            if (row.cf == null || row.cf.isEmpty() || row.cf.isMarkedForDelete())
+                continue;
+
+            keyspaces.add(KSMetaData.fromSchema(row.cf, serializedColumnFamilies(row.key)));
         }
-        // add the schema
-        rm.add(new QueryPath(Migration.SCHEMA_CF,
-                             null,
-                             DEFINITION_SCHEMA_COLUMN_NAME),
-                             ByteBufferUtil.bytes(org.apache.cassandra.db.migration.avro.KsDef.SCHEMA$.toString()),
-                             now);
-        rm.apply();
-
-        // apply new version
-        rm = new RowMutation(Table.SYSTEM_TABLE, Migration.LAST_MIGRATION_KEY);
-        rm.add(new QueryPath(Migration.SCHEMA_CF, null, Migration.LAST_MIGRATION_KEY),
-               ByteBuffer.wrap(UUIDGen.decompose(version)),
-               now);
-        rm.apply();
+
+        return keyspaces;
+    }
+
+    private static ColumnFamily serializedColumnFamilies(DecoratedKey ksNameKey)
+    {
+        ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
+        return cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
     }
 
-    /** loads a version of keyspace definitions from storage */
+    /**
+     * Loads a version of keyspace definitions from storage (using old SCHEMA_CF as a data source)
+     * Note: If definitions where found in SCHEMA_CF this method would load them into new schema handling table KEYSPACE_CF
+     *
+     * @param version The version of the latest migration.
+     *
+     * @return Collection of found keyspace definitions
+     *
+     * @throws IOException if failed to read SCHEMA_CF or failed to deserialize Avro schema
+     */
     public static synchronized Collection<KSMetaData> loadFromStorage(UUID version) throws IOException
     {
-        DecoratedKey vkey = StorageService.getPartitioner().decorateKey(Migration.toUTF8Bytes(version));
+        DecoratedKey vkey = StorageService.getPartitioner().decorateKey(toUTF8Bytes(version));
         Table defs = Table.open(Table.SYSTEM_TABLE);
         ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.SCHEMA_CF);
-        QueryFilter filter = QueryFilter.getIdentityFilter(vkey, new QueryPath(Migration.SCHEMA_CF));
-        ColumnFamily cf = cfStore.getColumnFamily(filter);
+        ColumnFamily cf = cfStore.getColumnFamily(QueryFilter.getIdentityFilter(vkey, new QueryPath(Migration.SCHEMA_CF)));
         IColumn avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME);
-        if (avroschema == null)
-            // TODO: more polite way to handle this?
-            throw new RuntimeException("Cannot read system table! Are you upgrading a pre-release version?");
 
-        ByteBuffer value = avroschema.value();
-        Schema schema = Schema.parse(ByteBufferUtil.string(value));
+        Collection<KSMetaData> keyspaces = Collections.emptyList();
 
-        // deserialize keyspaces using schema
-        Collection<KSMetaData> keyspaces = new ArrayList<KSMetaData>();
-        for (IColumn column : cf.getSortedColumns())
+        if (avroschema != null)
         {
-            if (column.name().equals(DEFINITION_SCHEMA_COLUMN_NAME))
-                continue;
-            org.apache.cassandra.db.migration.avro.KsDef ks = SerDeUtils.deserialize(schema, column.value(), new org.apache.cassandra.db.migration.avro.KsDef());
-            keyspaces.add(KSMetaData.fromAvro(ks));
+            ByteBuffer value = avroschema.value();
+            org.apache.avro.Schema schema = org.apache.avro.Schema.parse(ByteBufferUtil.string(value));
+
+            // deserialize keyspaces using schema
+            keyspaces = new ArrayList<KSMetaData>();
+
+            for (IColumn column : cf.getSortedColumns())
+            {
+                if (column.name().equals(DEFINITION_SCHEMA_COLUMN_NAME))
+                    continue;
+                KsDef ks = deserializeAvro(schema, column.value(), new KsDef());
+                keyspaces.add(KSMetaData.fromAvro(ks));
+            }
+
+            // store deserialized keyspaces into new place
+            dumpToStorage(keyspaces);
+
+            logger.info("Truncating deprecated system column families (migrations, schema)...");
+            MigrationHelper.dropColumnFamily(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF);
+            MigrationHelper.dropColumnFamily(Table.SYSTEM_TABLE, Migration.SCHEMA_CF);
         }
+
         return keyspaces;
     }
+
+    /**
+     * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param data The data of the message from remote node with schema information
+     * @param version The version of the message
+     *
+     * @throws ConfigurationException If one of metadata attributes has invalid value
+     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
+     */
+    public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException
+    {
+        // save current state of the schema
+        Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF);
+        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
+
+        // apply remote mutations
+        for (RowMutation mutation : MigrationManager.deserializeMigrationMessage(data, version))
+            mutation.apply();
+
+        if (!StorageService.instance.isClientMode())
+            MigrationHelper.flushSchemaCFs();
+
+        Schema.instance.updateVersion();
+
+        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF));
+        mergeColumnFamilies(oldColumnFamilies, SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF));
+
+        // it is save to drop a keyspace only when all nested ColumnFamilies where deleted
+        for (String keyspaceToDrop : keyspacesToDrop)
+            MigrationHelper.dropKeyspace(keyspaceToDrop);
+    }
+
+    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+            throws ConfigurationException, IOException
+    {
+        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+        /**
+         * At first step we check if any new keyspaces were added.
+         */
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily ksAttrs = entry.getValue();
+
+            // we don't care about nested ColumnFamilies here because those are going to be processed separately
+            if (!ksAttrs.isEmpty())
+                MigrationHelper.addKeyspace(KSMetaData.fromSchema(entry.getValue(), null));
+        }
+
+        /**
+         * At second step we check if there were any keyspaces re-created, in this context
+         * re-created means that they were previously deleted but still exist in the low-level schema as empty keys
+         */
+
+        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
+
+        // instead of looping over all modified entries and skipping processed keys all the time
+        // we would rather store "left to process" items and iterate over them removing already met keys
+        List<DecoratedKey> leftToProcess = new ArrayList<DecoratedKey>();
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : modifiedEntries.entrySet())
+        {
+            ColumnFamily prevValue = entry.getValue().leftValue();
+            ColumnFamily newValue = entry.getValue().rightValue();
+
+            if (prevValue.isEmpty())
+            {
+                MigrationHelper.addKeyspace(KSMetaData.fromSchema(newValue, null));
+                continue;
+            }
+
+            leftToProcess.add(entry.getKey());
+        }
+
+        if (leftToProcess.size() == 0)
+            return Collections.emptySet();
+
+        /**
+         * At final step we updating modified keyspaces and saving keyspaces drop them later
+         */
+
+        Set<String> keyspacesToDrop = new HashSet<String>();
+
+        for (DecoratedKey key : leftToProcess)
+        {
+            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(key);
+
+            ColumnFamily newState = valueDiff.rightValue();
+
+            if (newState.isEmpty())
+                keyspacesToDrop.add(AsciiType.instance.getString(key.key));
+            else
+                MigrationHelper.updateKeyspace(KSMetaData.fromSchema(newState));
+        }
+
+        return keyspacesToDrop;
+    }
+
+    private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+            throws ConfigurationException, IOException
+    {
+        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+        // check if any new Keyspaces with ColumnFamilies were added.
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily cfAttrs = entry.getValue();
+
+            if (!cfAttrs.isEmpty())
+            {
+               Map<String, CfDef> cfDefs = KSMetaData.deserializeColumnFamilies(cfAttrs);
+
+                for (CfDef cfDef : cfDefs.values())
+                    MigrationHelper.addColumnFamily(cfDef);
+            }
+        }
+
+        // deal with modified ColumnFamilies (remember that all of the keyspace nested ColumnFamilies are put to the single row)
+        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
+
+        for (DecoratedKey keyspace : modifiedEntries.keySet())
+        {
+            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(keyspace);
+
+            ColumnFamily prevValue = valueDiff.leftValue(); // state before external modification
+            ColumnFamily newValue = valueDiff.rightValue(); // updated state
+
+            if (prevValue.isEmpty()) // whole keyspace was deleted and now it's re-created
+            {
+                for (CfDef cfDef : KSMetaData.deserializeColumnFamilies(newValue).values())
+                    MigrationHelper.addColumnFamily(cfDef);
+            }
+            else if (newValue.isEmpty()) // whole keyspace is deleted
+            {
+                for (CfDef cfDef : KSMetaData.deserializeColumnFamilies(prevValue).values())
+                    MigrationHelper.dropColumnFamily(cfDef.keyspace, cfDef.name);
+            }
+            else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
+            {
+                String ksName = AsciiType.instance.getString(keyspace.key);
+
+                Map<String, CfDef> oldCfDefs = new HashMap<String, CfDef>();
+                for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())
+                    oldCfDefs.put(cfm.cfName, cfm.toThrift());
+
+                Map<String, CfDef> newCfDefs = KSMetaData.deserializeColumnFamilies(newValue);
+
+                MapDifference<String, CfDef> cfDefDiff = Maps.difference(oldCfDefs, newCfDefs);
+
+                for (CfDef cfDef : cfDefDiff.entriesOnlyOnRight().values())
+                    MigrationHelper.addColumnFamily(cfDef);
+
+                for (CfDef cfDef : cfDefDiff.entriesOnlyOnLeft().values())
+                    MigrationHelper.dropColumnFamily(cfDef.keyspace, cfDef.name);
+
+                for (MapDifference.ValueDifference<CfDef> cfDef : cfDefDiff.entriesDiffering().values())
+                    MigrationHelper.updateColumnFamily(cfDef.rightValue());
+            }
+        }
+    }
+
+    private static ByteBuffer toUTF8Bytes(UUID version)
+    {
+        return ByteBufferUtil.bytes(version.toString());
+    }
+
+    /**
+     * Deserialize a single object based on the given Schema.
+     *
+     * @param writer writer's schema
+     * @param bytes Array to deserialize from
+     * @param ob An empty object to deserialize into (must not be null).
+     *
+     * @return serialized Avro object
+     *
+     * @throws IOException if deserialization failed
+     */
+    public static <T extends SpecificRecord> T deserializeAvro(org.apache.avro.Schema writer, ByteBuffer bytes, T ob) throws IOException
+    {
+        BinaryDecoder dec = DIRECT_DECODERS.createBinaryDecoder(ByteBufferUtil.getArray(bytes), null);
+        SpecificDatumReader<T> reader = new SpecificDatumReader<T>(writer);
+        reader.setExpected(ob.getSchema());
+        return reader.read(ob, dec);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
new file mode 100644
index 0000000..66c7f77
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOError;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.MigrationManager;
+
+/**
+ * Sends it's current schema state in form of row mutations in reply to the remote node's request.
+ * Such a request is made when one of the nodes, by means of Gossip, detects schema disagreement in the ring.
+ */
+public class MigrationRequestVerbHandler implements IVerbHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
+
+    public void doVerb(Message message, String id)
+    {
+        logger.debug("Received migration request from {}.", message.getFrom());
+
+        try
+        {
+            Message response = message.getInternalReply(MigrationManager.serializeSchema(SystemTable.serializeSchema(), message.getVersion()), message.getVersion());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index c831382..ef16eb0 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -23,11 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
@@ -36,10 +32,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql.QueryProcessor;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.migration.MigrationHelper;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Constants;
@@ -54,6 +54,11 @@ public class SystemTable
     public static final String INDEX_CF = "IndexInfo";
     public static final String NODE_ID_CF = "NodeIdInfo";
     public static final String VERSION_CF = "Versions";
+    // see layout description in the DefsTable class header
+    public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
+    public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
+    public static final String SCHEMA_COLUMNS_CF = "schema_columns";
+
     private static final ByteBuffer LOCATION_KEY = ByteBufferUtil.bytes("L");
     private static final ByteBuffer RING_KEY = ByteBufferUtil.bytes("Ring");
     private static final ByteBuffer BOOTSTRAP_KEY = ByteBufferUtil.bytes("Bootstrap");
@@ -506,4 +511,107 @@ public class SystemTable
         }
         return l;
     }
+
+    /**
+     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+     * @return CFS responsible to hold low-level serialized schema
+     */
+    public static ColumnFamilyStore schemaCFS(String cfName)
+    {
+        return Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(cfName);
+    }
+
+    public static List<Row> serializedSchema()
+    {
+        List<Row> schema = new ArrayList<Row>();
+
+        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
+        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
+        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
+
+        return schema;
+    }
+
+    /**
+     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
+     */
+    public static List<Row> serializedSchema(String schemaCfName)
+    {
+        Token minToken = StorageService.getPartitioner().getMinimumToken();
+
+        return schemaCFS(schemaCfName).getRangeSlice(null,
+                                                     new Range<RowPosition>(minToken.minKeyBound(),
+                                                                            minToken.maxKeyBound()),
+                                                     Integer.MAX_VALUE,
+                                                     new IdentityQueryFilter(),
+                                                     null);
+    }
+
+    public static Collection<RowMutation> serializeSchema()
+    {
+        Map<DecoratedKey, RowMutation> mutationMap = new HashMap<DecoratedKey, RowMutation>();
+
+        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
+        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
+        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
+
+        return mutationMap.values();
+    }
+
+    private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
+    {
+        for (Row schemaRow : serializedSchema(schemaCfName))
+        {
+            RowMutation mutation = mutationMap.get(schemaRow.key);
+
+            if (mutation == null)
+            {
+                mutationMap.put(schemaRow.key, new RowMutation(Table.SYSTEM_TABLE, schemaRow));
+                continue;
+            }
+
+            mutation.add(schemaRow.cf);
+        }
+    }
+
+    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
+    {
+        Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
+
+        for (Row schemaEntity : SystemTable.serializedSchema(cfName))
+            schema.put(schemaEntity.key, schemaEntity.cf);
+
+        return schema;
+    }
+
+    public static ByteBuffer getSchemaKSKey(String ksName)
+    {
+        return AsciiType.instance.fromString(ksName);
+    }
+
+    public static Row readSchemaRow(String ksName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+
+        ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
+        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(SCHEMA_KEYSPACES_CF)));
+
+        return new Row(key, result);
+    }
+
+    public static Row readSchemaRow(String ksName, String cfName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+
+        ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
+        ColumnFamily result = schemaCFS.getColumnFamily(key,
+                                                        new QueryPath(SCHEMA_COLUMNFAMILIES_CF),
+                                                        MigrationHelper.searchComposite(cfName, true),
+                                                        MigrationHelper.searchComposite(cfName, false),
+                                                        false,
+                                                        Integer.MAX_VALUE);
+
+        return new Row(key, result);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java b/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
index fd17fe5..b3762ca 100644
--- a/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
@@ -1,17 +1,3 @@
-package org.apache.cassandra.db.migration;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,79 +15,38 @@ import org.apache.cassandra.utils.UUIDGen;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.cassandra.db.migration;
+
+import java.io.IOException;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 
-public class AddColumnFamily extends Migration
+public class  AddColumnFamily extends Migration
 {
-    private CFMetaData cfm;
+    private final CFMetaData cfm;
     
-    /** Required no-arg constructor */
-    protected AddColumnFamily() { /* pass */ }
-    
-    public AddColumnFamily(CFMetaData cfm) throws ConfigurationException, IOException
+    public AddColumnFamily(CFMetaData cfm) throws ConfigurationException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        this.cfm = cfm;
-        KSMetaData ksm = schema.getTableDefinition(cfm.ksName);
-        
+        super(System.nanoTime());
+
+        KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName);
+
         if (ksm == null)
-            throw new ConfigurationException("No such keyspace: " + cfm.ksName);
+            throw new ConfigurationException(String.format("Can't add ColumnFamily '%s' to Keyspace '%s': Keyspace does not exist.", cfm.cfName, cfm.ksName));
         else if (ksm.cfMetaData().containsKey(cfm.cfName))
-            throw new ConfigurationException(String.format("%s already exists in keyspace %s",
-                                                           cfm.cfName,
-                                                           cfm.ksName));
+            throw new ConfigurationException(String.format("Can't add ColumnFamily '%s' to Keyspace '%s': Already exists.", cfm.cfName, cfm.ksName));
         else if (!Migration.isLegalName(cfm.cfName))
-            throw new ConfigurationException("Invalid column family name: " + cfm.cfName);
-        for (Map.Entry<ByteBuffer, ColumnDefinition> entry : cfm.getColumn_metadata().entrySet())
-        {
-            String indexName = entry.getValue().getIndexName();
-            if (indexName != null && !Migration.isLegalName(indexName))
-                throw new ConfigurationException("Invalid index name: " + indexName);
-        }
+            throw new ConfigurationException("Can't add ColumnFamily '%s' to Keyspace '%s': Invalid ColumnFamily name.");
 
-        // clone ksm but include the new cf def.
-        KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
-        
-        rm = makeDefinitionMutation(newKsm, null, newVersion);
-    }
-    
-    private KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm)
-    {
-        return KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
-    }
-    
-    public void applyModels() throws IOException
-    {
-        // reinitialize the table.
-        KSMetaData ksm = schema.getTableDefinition(cfm.ksName);
-        ksm = makeNewKeyspaceDefinition(ksm);
-        try
-        {
-            schema.load(cfm);
-        }
-        catch (ConfigurationException ex)
-        {
-            throw new IOException(ex);
-        }
-        Table.open(cfm.ksName, schema); // make sure it's init-ed w/ the old definitions first, since we're going to call initCf on the new one manually
-        schema.setTableDefinition(ksm, newVersion);
-        // these definitions could have come from somewhere else.
-        schema.fixCFMaxId();
-        if (!StorageService.instance.isClientMode())
-            Table.open(ksm.name, schema).initCf(cfm.cfId, cfm.cfName);
-    }
-
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.AddColumnFamily acf = new org.apache.cassandra.db.migration.avro.AddColumnFamily();
-        acf.cf = cfm.toAvro();
-        mi.migration = acf;
+        this.cfm = cfm;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.AddColumnFamily acf = (org.apache.cassandra.db.migration.avro.AddColumnFamily)mi.migration;
-        cfm = CFMetaData.fromAvro(acf.cf);
+        MigrationHelper.addColumnFamily(cfm, timestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/AddKeyspace.java b/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
index 9ba8076..1cd1e99 100644
--- a/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
+++ b/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
@@ -20,70 +20,33 @@ package org.apache.cassandra.db.migration;
 
 import java.io.IOException;
 
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 
 public class AddKeyspace extends Migration
 {
-    private KSMetaData ksm;
+    private final KSMetaData ksm;
     
-    /** Required no-arg constructor */
-    protected AddKeyspace() { /* pass */ }
-    
-    public AddKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
+    public AddKeyspace(KSMetaData ksm) throws ConfigurationException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        
-        if (schema.getTableDefinition(ksm.name) != null)
-            throw new ConfigurationException("Keyspace already exists.");
-        if (!Migration.isLegalName(ksm.name))
-            throw new ConfigurationException("Invalid keyspace name: " + ksm.name);
+        super(System.nanoTime());
+
+        if (Schema.instance.getTableDefinition(ksm.name) != null)
+            throw new ConfigurationException(String.format("Can't add Keyspace '%s': Already exists.", ksm.name));
+        else if (!Migration.isLegalName(ksm.name))
+            throw new ConfigurationException(String.format("Can't add Keyspace '%s': Invalid name.", ksm.name));
         for (CFMetaData cfm : ksm.cfMetaData().values())
             if (!Migration.isLegalName(cfm.cfName))
-                throw new ConfigurationException("Invalid column family name: " + cfm.cfName);
-        
-        this.ksm = ksm;
-        rm = makeDefinitionMutation(ksm, null, newVersion);
-    }
+                throw new ConfigurationException(String.format("Can't add Keyspace '%s': Invalid ColumnFamily name '%s'.", ksm.name, cfm.cfName));
 
-    public void applyModels() throws IOException
-    {
-        for (CFMetaData cfm : ksm.cfMetaData().values())
-        {
-            try
-            {
-                schema.load(cfm);
-            }
-            catch (ConfigurationException ex)
-            {
-                // throw RTE since this indicates a table,cf maps to an existing ID. It shouldn't if this is really a
-                // new keyspace.
-                throw new RuntimeException(ex);
-            }
-        }
-        schema.setTableDefinition(ksm, newVersion);
-        // these definitions could have come from somewhere else.
-        schema.fixCFMaxId();
-        if (!StorageService.instance.isClientMode())
-        {
-            Table.open(ksm.name, schema);
-        }
-    }
-    
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.AddKeyspace aks = new org.apache.cassandra.db.migration.avro.AddKeyspace();
-        aks.ks = ksm.toAvro();
-        mi.migration = aks;
+        this.ksm = ksm;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.AddKeyspace aks = (org.apache.cassandra.db.migration.avro.AddKeyspace)mi.migration;
-        ksm = KSMetaData.fromAvro(aks.ks);
+        MigrationHelper.addKeyspace(ksm, timestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java b/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
index 0407781..2f2bed2 100644
--- a/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
@@ -1,17 +1,3 @@
-package org.apache.cassandra.db.migration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,78 +15,41 @@ import org.apache.cassandra.utils.UUIDGen;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.cassandra.db.migration;
 
+import java.io.IOException;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 
 public class DropColumnFamily extends Migration
 {
-    private String tableName;
-    private String cfName;
-    
-    /** Required no-arg constructor */
-    protected DropColumnFamily() { /* pass */ }
+    private final String ksName;
+    private final String cfName;
     
-    public DropColumnFamily(String tableName, String cfName) throws ConfigurationException, IOException
+    public DropColumnFamily(String ksName, String cfName) throws ConfigurationException
     {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        this.tableName = tableName;
-        this.cfName = cfName;
-        
-        KSMetaData ksm = schema.getTableDefinition(tableName);
+        super(System.nanoTime());
+
+        KSMetaData ksm = Schema.instance.getTableDefinition(ksName);
         if (ksm == null)
-            throw new ConfigurationException("No such keyspace: " + tableName);
+            throw new ConfigurationException("Can't drop ColumnFamily: No such keyspace '" + ksName + "'.");
         else if (!ksm.cfMetaData().containsKey(cfName))
-            throw new ConfigurationException("CF is not defined in that keyspace.");
-        
-        KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
-        rm = makeDefinitionMutation(newKsm, null, newVersion);
-    }
-
-    private KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm)
-    {
-        // clone ksm but do not include the new def
-        CFMetaData cfm = ksm.cfMetaData().get(cfName);
-        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
-        newCfs.remove(cfm);
-        assert newCfs.size() == ksm.cfMetaData().size() - 1;
-        return KSMetaData.cloneWith(ksm, newCfs);
-    }
-
-    public void applyModels() throws IOException
-    {
-        ColumnFamilyStore cfs = Table.open(tableName, schema).getColumnFamilyStore(cfName);
+            throw new ConfigurationException(String.format("Can't drop ColumnFamily (ks=%s, cf=%s) : Not defined in that keyspace.", ksName, cfName));
 
-        // reinitialize the table.
-        KSMetaData existing = schema.getTableDefinition(tableName);
-        CFMetaData cfm = existing.cfMetaData().get(cfName);
-        KSMetaData ksm = makeNewKeyspaceDefinition(existing);
-        schema.purge(cfm);
-        schema.setTableDefinition(ksm, newVersion);
-
-        if (!StorageService.instance.isClientMode())
-        {
-            cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily));
-            Table.open(ksm.name, schema).dropCf(cfm.cfId);
-        }
-    }
-    
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.DropColumnFamily dcf = new org.apache.cassandra.db.migration.avro.DropColumnFamily();
-        dcf.ksname = new org.apache.avro.util.Utf8(tableName);
-        dcf.cfname = new org.apache.avro.util.Utf8(cfName);
-        mi.migration = dcf;
+        this.ksName = ksName;
+        this.cfName = cfName;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.DropColumnFamily dcf = (org.apache.cassandra.db.migration.avro.DropColumnFamily)mi.migration;
-        tableName = dcf.ksname.toString();
-        cfName = dcf.cfname.toString();
+        MigrationHelper.dropColumnFamily(ksName, cfName, timestamp);
     }
 
     @Override
     public String toString()
     {
-        return String.format("Drop column family: %s.%s", tableName, cfName);
+        return String.format("Drop column family: %s.%s", ksName, cfName);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/DropKeyspace.java b/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
index 00d9b22..c6e200e 100644
--- a/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
+++ b/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
@@ -20,65 +20,28 @@ package org.apache.cassandra.db.migration;
 
 import java.io.IOException;
 
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 
 public class DropKeyspace extends Migration
 {
-    private String name;
+    private final String name;
     
-    /** Required no-arg constructor */
-    protected DropKeyspace() { /* pass */ }
-    
-    public DropKeyspace(String name) throws ConfigurationException, IOException
-    {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        this.name = name;
-        KSMetaData ksm = schema.getTableDefinition(name);
-        if (ksm == null)
-            throw new ConfigurationException("Keyspace does not exist.");
-        rm = makeDefinitionMutation(null, ksm, newVersion);
-    }
-
-    public void applyModels() throws IOException
+    public DropKeyspace(String name) throws ConfigurationException
     {
-        String snapshotName = Table.getTimestampedSnapshotName(name);
-        KSMetaData ksm = schema.getTableDefinition(name);
+        super(System.nanoTime());
 
-        // remove all cfs from the table instance.
-        for (CFMetaData cfm : ksm.cfMetaData().values())
-        {
-            ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName);
-            schema.purge(cfm);
-            if (!StorageService.instance.isClientMode())
-            {
-                cfs.snapshot(snapshotName);
-                Table.open(ksm.name, schema).dropCf(cfm.cfId);
-            }
-        }
+        KSMetaData ksm = Schema.instance.getTableDefinition(name);
+        if (ksm == null)
+            throw new ConfigurationException("Can't drop keyspace '" + name + "' because it does not exist.");
 
-        // remove the table from the static instances.
-        Table.clear(ksm.name, schema);
-        // reset defs.
-        schema.clearTableDefinition(ksm, newVersion);
-    }
-    
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.DropKeyspace dks = new org.apache.cassandra.db.migration.avro.DropKeyspace();
-        dks.ksname = new org.apache.avro.util.Utf8(name);
-        mi.migration = dks;
+        this.name = name;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.DropKeyspace dks = (org.apache.cassandra.db.migration.avro.DropKeyspace)mi.migration;
-        name = dks.ksname.toString();
+        MigrationHelper.dropKeyspace(name, timestamp);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/Migration.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/Migration.java b/src/java/org/apache/cassandra/db/migration/Migration.java
index f969a9b..762fec3 100644
--- a/src/java/org/apache/cassandra/db/migration/Migration.java
+++ b/src/java/org/apache/cassandra/db/migration/Migration.java
@@ -19,48 +19,31 @@
 package org.apache.cassandra.db.migration;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.io.SerDeUtils;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
- * A migration represents a single metadata mutation (cf dropped, added, etc.).  Migrations can be applied locally, or
- * serialized and sent to another machine where it can be applied there. Each migration has a version represented by
- * a TimeUUID that can be used to look up both the Migration itself (see getLocalMigrations) as well as a serialization
- * of the Keyspace definition that was modified.
- * 
- * There are three parts to a migration (think of it as a schema update):
- * 1. data is written to the schema cf.
- * 2. the migration is serialized to the migrations cf.
- * 3. updated models are applied to the cassandra instance.
+ * A migration represents a single metadata mutation (cf dropped, added, etc.).
+ *
+ * There are two parts to a migration (think of it as a schema update):
+ * 1. data is written to the schema cf (SCHEMA_KEYSPACES_CF).
+ * 2. updated models are applied to the cassandra instance.
  * 
- * Since steps 1, 2 and 3 are not committed atomically, care should be taken to ensure that a node/cluster is reasonably
- * quiescent with regard to the keyspace or columnfamily whose schema is being modified.
- * 
- * Each class that extends Migration is required to implement a no arg constructor, which will be used to inflate the
- * object from it's serialized form.
+ * Since all steps are not committed atomically, care should be taken to ensure that a node/cluster is reasonably
+ * quiescent with regard to the Keyspace or ColumnFamily whose schema is being modified.
  */
 public abstract class Migration
 {
@@ -69,107 +52,52 @@ public abstract class Migration
     public static final String NAME_VALIDATOR_REGEX = "\\w+";
     public static final String MIGRATIONS_CF = "Migrations";
     public static final String SCHEMA_CF = "Schema";
-    public static final ByteBuffer MIGRATIONS_KEY = ByteBufferUtil.bytes("Migrations Key");
     public static final ByteBuffer LAST_MIGRATION_KEY = ByteBufferUtil.bytes("Last Migration");
-    
-    protected RowMutation rm;
-    protected UUID newVersion;
-    protected UUID lastVersion;
-
-    // the migration in column form, used when announcing to others
-    private IColumn column;
 
-    // cast of the schema at the time when migration was initialized
-    protected final Schema schema = Schema.instance;
+    protected final long timestamp;
 
-    /** Subclasses must have a matching constructor */
-    protected Migration() { }
-
-    Migration(UUID newVersion, UUID lastVersion)
+    Migration(long modificationTimestamp)
     {
-        this();
-        this.newVersion = newVersion;
-        this.lastVersion = lastVersion;
+        timestamp = modificationTimestamp;
     }
 
-    /** apply changes */
-    public final void apply() throws IOException, ConfigurationException
+    public final void apply() throws ConfigurationException, IOException
     {
-        // ensure migration is serial. don't apply unless the previous version matches.
-        if (!schema.getVersion().equals(lastVersion))
-            throw new ConfigurationException("Previous version mismatch. cannot apply.");
-        if (newVersion.timestamp() <= lastVersion.timestamp())
-            throw new ConfigurationException("New version timestamp is not newer than the current version timestamp.");
-        // write to schema
-        assert rm != null;
-        if (!StorageService.instance.isClientMode())
-        {
-            rm.apply();
+        applyImpl();
 
-            long now = System.currentTimeMillis();
-            ByteBuffer buf = serialize();
-            RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, MIGRATIONS_KEY);
-            ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, MIGRATIONS_CF);
-            column = new Column(ByteBuffer.wrap(UUIDGen.decompose(newVersion)), buf, now);
-            cf.addColumn(column);
-            migration.add(cf);
-            migration.apply();
-            
-            // note that we're storing this in the system table, which is not replicated
-            logger.info("Applying migration {} {}", newVersion.toString(), toString());
-            migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY);
-            migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), ByteBuffer.wrap(UUIDGen.decompose(newVersion)), now);
-            migration.apply();
+        if (!StorageService.instance.isClientMode())
+            MigrationHelper.flushSchemaCFs();
 
-            // if we fail here, there will be schema changes in the CL that will get replayed *AFTER* the schema is loaded.
-            // CassandraDaemon checks for this condition (the stored version will be greater than the loaded version)
-            // and calls MigrationManager.applyMigrations(loaded version, stored version).
-        
-            // flush changes out of memtables so we don't need to rely on the commit log.
-            ColumnFamilyStore[] schemaStores = new ColumnFamilyStore[] {
-                Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(Migration.MIGRATIONS_CF),
-                Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(Migration.SCHEMA_CF)
-            };
-            List<Future<?>> flushes = new ArrayList<Future<?>>();
-            for (ColumnFamilyStore cfs : schemaStores)
-                flushes.add(cfs.forceFlush());
-            for (Future<?> f : flushes)
-            {
-                if (f == null)
-                    // applying the migration triggered a flush independently
-                    continue;
-                try
-                {
-                    f.get();
-                }
-                catch (ExecutionException e)
-                {
-                    throw new IOException(e);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-        }
-        
-        applyModels(); 
+        Schema.instance.updateVersion();
     }
 
-    /** send this migration immediately to existing nodes in the cluster.  apply() must be called first. */
+    /**
+     * Class specific apply implementation where schema migration logic should be put
+     *
+     * @throws IOException on any I/O related error.
+     * @throws ConfigurationException if there is object misconfiguration.
+     */
+    protected abstract void applyImpl() throws ConfigurationException, IOException;
+
+    /** Send schema update (in form of row mutations) to alive nodes in the cluster. apply() must be called first. */
     public final void announce()
     {
         assert !StorageService.instance.isClientMode();
-        assert column != null;
-        MigrationManager.announce(column);
+        MigrationManager.announce(SystemTable.serializeSchema());
         passiveAnnounce(); // keeps gossip in sync w/ what we just told everyone
     }
 
+    /** Announce new schema version over Gossip */
     public final void passiveAnnounce()
     {
-        MigrationManager.passiveAnnounce(newVersion);
+        MigrationManager.passiveAnnounce(Schema.instance.getVersion());
     }
 
+    /**
+     * Used only in case node has old style migration schema (newly updated)
+     * @return the UUID identifying version of the last applied migration
+     */
+    @Deprecated
     public static UUID getLastMigrationId()
     {
         DecoratedKey<?> dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
@@ -183,137 +111,6 @@ public abstract class Migration
             return UUIDGen.getUUID(cf.getColumn(LAST_MIGRATION_KEY).value());
     }
     
-    /** keep in mind that applyLive might happen on another machine */
-    abstract void applyModels() throws IOException;
-    
-    /** Deflate this Migration into an Avro object. */
-    public abstract void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi);
-    
-    /** Inflate this Migration from an Avro object: called after the required no-arg constructor. */
-    public abstract void subinflate(org.apache.cassandra.db.migration.avro.Migration mi);
-    
-    public UUID getVersion()
-    {
-        return newVersion;
-    }
-
-    /**
-     * Definitions are serialized as a row with a UUID key, with a magical column named DEFINITION_SCHEMA_COLUMN_NAME
-     * (containing the Avro Schema) and a column per keyspace. Each keyspace column contains a avro.KsDef object
-     * encoded with the Avro schema.
-     */
-    final RowMutation makeDefinitionMutation(KSMetaData add, KSMetaData remove, UUID versionId) throws IOException
-    {
-        // collect all keyspaces, while removing 'remove' and adding 'add'
-        List<KSMetaData> ksms = new ArrayList<KSMetaData>();
-        for (String tableName : schema.getNonSystemTables())
-        {
-            if (remove != null && remove.name.equals(tableName) || add != null && add.name.equals(tableName))
-                continue;
-            ksms.add(schema.getTableDefinition(tableName));
-        }
-        if (add != null)
-            ksms.add(add);
-
-        // wrap in mutation
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, toUTF8Bytes(versionId));
-        long now = System.currentTimeMillis();
-        // add a column for each keyspace
-        for (KSMetaData ksm : ksms)
-            rm.add(new QueryPath(SCHEMA_CF, null, ByteBufferUtil.bytes(ksm.name)), SerDeUtils.serialize(ksm.toAvro()), now);
-        // add the schema
-        rm.add(new QueryPath(SCHEMA_CF,
-                             null,
-                             DefsTable.DEFINITION_SCHEMA_COLUMN_NAME),
-                             ByteBufferUtil.bytes(org.apache.cassandra.db.migration.avro.KsDef.SCHEMA$.toString()),
-                             now);
-        return rm;
-    }
-        
-    public ByteBuffer serialize() throws IOException
-    {
-        // super deflate
-        org.apache.cassandra.db.migration.avro.Migration mi = new org.apache.cassandra.db.migration.avro.Migration();
-        mi.old_version = new org.apache.cassandra.utils.avro.UUID();
-        mi.old_version.bytes(UUIDGen.decompose(lastVersion));
-        mi.new_version = new org.apache.cassandra.utils.avro.UUID();
-        mi.new_version.bytes(UUIDGen.decompose(newVersion));
-        mi.classname = new org.apache.avro.util.Utf8(this.getClass().getName());
-        // TODO: Avro RowMutation serialization?
-        DataOutputBuffer dob = new DataOutputBuffer();
-        try
-        {
-            RowMutation.serializer().serialize(rm, dob, MessagingService.version_);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        mi.row_mutation = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-        // sub deflate
-        this.subdeflate(mi);
-
-        // serialize
-        return SerDeUtils.serializeWithSchema(mi);
-    }
-
-    public static Migration deserialize(ByteBuffer bytes, int version) throws IOException
-    {
-        // deserialize
-        org.apache.cassandra.db.migration.avro.Migration mi = SerDeUtils.deserializeWithSchema(bytes, new org.apache.cassandra.db.migration.avro.Migration());
-
-        // create an instance of the migration subclass
-        Migration migration;
-        try
-        {
-            Class<?> migrationClass = Class.forName(mi.classname.toString());
-            Constructor<?> migrationConstructor = migrationClass.getDeclaredConstructor();
-            migrationConstructor.setAccessible(true);
-            migration = (Migration)migrationConstructor.newInstance();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("Invalid migration class: " + mi.classname.toString(), e);
-        }
-        
-        // super inflate
-        migration.lastVersion = UUIDGen.getUUID(ByteBuffer.wrap(mi.old_version.bytes()));
-        migration.newVersion = UUIDGen.getUUID(ByteBuffer.wrap(mi.new_version.bytes()));
-        try
-        {
-            migration.rm = RowMutation.serializer().deserialize(SerDeUtils.createDataInputStream(mi.row_mutation), version);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        // sub inflate
-        migration.subinflate(mi);
-        return migration;
-    }
-    
-    /** load serialized migrations. */
-    public static Collection<IColumn> getLocalMigrations(UUID start, UUID end)
-    {
-        DecoratedKey<?> dkey = StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY);
-        Table defs = Table.open(Table.SYSTEM_TABLE);
-        ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(dkey,
-                                                        new QueryPath(MIGRATIONS_CF),
-                                                        ByteBuffer.wrap(UUIDGen.decompose(start)),
-                                                        ByteBuffer.wrap(UUIDGen.decompose(end)),
-                                                        false,
-                                                        100);
-        ColumnFamily cf = cfStore.getColumnFamily(filter);
-        return cf.getSortedColumns();
-    }
-    
-    public static ByteBuffer toUTF8Bytes(UUID version)
-    {
-        return ByteBufferUtil.bytes(version.toString());
-    }
-    
     public static boolean isLegalName(String s)
     {
         return s.matches(Migration.NAME_VALIDATOR_REGEX);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/MigrationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/MigrationHelper.java b/src/java/org/apache/cassandra/db/migration/MigrationHelper.java
new file mode 100644
index 0000000..981e02b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/migration/MigrationHelper.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.migration;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class MigrationHelper
+{
+    private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+    public static ByteBuffer readableColumnName(ByteBuffer columnName, AbstractType comparator)
+    {
+        return ByteBufferUtil.bytes(comparator.getString(columnName));
+    }
+
+    public static ByteBuffer valueAsBytes(Object value)
+    {
+        try
+        {
+            return ByteBuffer.wrap(jsonMapper.writeValueAsBytes(value));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Object deserializeValue(ByteBuffer value, Class<?> valueClass)
+    {
+        try
+        {
+            // because jackson serialized ByteBuffer as byte[] and needs help with deserialization later
+            if (valueClass.equals(ByteBuffer.class))
+                return ByteBuffer.wrap((byte[]) deserializeValue(value, byte[].class));
+
+            return jsonMapper.readValue(ByteBufferUtil.getArray(value), valueClass);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Class<?> getValueClass(Class<?> klass, String name)
+    {
+        try
+        {
+            return klass.getField(name).getType();
+        }
+        catch (NoSuchFieldException e)
+        {
+            throw new RuntimeException(e); // never happens
+        }
+    }
+
+    public static ByteBuffer searchComposite(String comp1, boolean start)
+    {
+        return compositeNameFor(comp1, !start, null, false, null, false);
+    }
+
+    public static ByteBuffer compositeNameFor(String comp1, String comp2)
+    {
+        return compositeNameFor(comp1, ByteBufferUtil.bytes(comp2), null);
+    }
+
+    public static ByteBuffer compositeNameFor(String comp1, ByteBuffer comp2, String comp3)
+    {
+        return compositeNameFor(comp1, false, comp2, false, comp3, false);
+    }
+
+    public static ByteBuffer compositeNameFor(String comp1, boolean limit1, ByteBuffer comp2, boolean limit2, String comp3, boolean limit3)
+    {
+        int totalSize = 0;
+
+        if (comp1 != null)
+            totalSize += 2 + comp1.length() + 1;
+
+        if (comp2 != null)
+            totalSize += 2 + comp2.remaining() + 1;
+
+        if (comp3 != null)
+            totalSize += 2 + comp3.length() + 1;
+
+        ByteBuffer bytes = ByteBuffer.allocate(totalSize);
+
+        if (comp1 != null)
+        {
+            bytes.putShort((short) comp1.length());
+            bytes.put(comp1.getBytes());
+            bytes.put((byte) (limit1 ? 1 : 0));
+        }
+
+        if (comp2 != null)
+        {
+            int pos = comp2.position(), limit = comp2.limit();
+
+            bytes.putShort((short) comp2.remaining());
+            bytes.put(comp2);
+            bytes.put((byte) (limit2 ? 1 : 0));
+            // restore original range
+            comp2.position(pos).limit(limit);
+        }
+
+        if (comp3 != null)
+        {
+            bytes.putShort((short) comp3.length());
+            bytes.put(comp3.getBytes());
+            bytes.put((byte) (limit3 ? 1 : 0));
+        }
+
+        bytes.rewind();
+
+        return bytes;
+    }
+
+    public static void flushSchemaCFs()
+    {
+        flushSchemaCF(SystemTable.SCHEMA_KEYSPACES_CF);
+        flushSchemaCF(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
+        flushSchemaCF(SystemTable.SCHEMA_COLUMNS_CF);
+    }
+
+    public static void flushSchemaCF(String cfName)
+    {
+        Future<?> flush = SystemTable.schemaCFS(cfName).forceFlush();
+
+        if (flush != null)
+            FBUtilities.waitOnFuture(flush);
+    }
+
+    /* Schema Mutation Helpers */
+
+    public static void addKeyspace(KSMetaData ksm, long timestamp) throws ConfigurationException, IOException
+    {
+        addKeyspace(ksm, timestamp, true);
+    }
+
+    public static void addKeyspace(KSMetaData ksDef) throws ConfigurationException, IOException
+    {
+        addKeyspace(ksDef, -1, false);
+    }
+
+    public static void addColumnFamily(CFMetaData cfm, long timestamp) throws ConfigurationException, IOException
+    {
+        addColumnFamily(cfm, timestamp, true);
+    }
+
+    public static void addColumnFamily(CfDef cfDef) throws ConfigurationException, IOException
+    {
+        try
+        {
+            addColumnFamily(CFMetaData.fromThrift(cfDef), -1, false);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new ConfigurationException(e.getMessage(), e);
+        }
+    }
+
+    public static void updateKeyspace(KsDef newState) throws ConfigurationException, IOException
+    {
+        updateKeyspace(newState, -1, false);
+    }
+
+    public static void updateKeyspace(KsDef newState, long timestamp) throws ConfigurationException, IOException
+    {
+        updateKeyspace(newState, timestamp, true);
+    }
+
+    public static void updateColumnFamily(CfDef newState) throws ConfigurationException, IOException
+    {
+        updateColumnFamily(newState, -1, false);
+    }
+
+    public static void updateColumnFamily(CfDef newState, long timestamp) throws ConfigurationException, IOException
+    {
+        updateColumnFamily(newState, timestamp, true);
+    }
+
+    public static void dropColumnFamily(String ksName, String cfName) throws IOException
+    {
+        dropColumnFamily(ksName, cfName, -1, false);
+    }
+
+    public static void dropColumnFamily(String ksName, String cfName, long timestamp) throws IOException
+    {
+        dropColumnFamily(ksName, cfName, timestamp, true);
+    }
+
+    public static void dropKeyspace(String ksName) throws IOException
+    {
+        dropKeyspace(ksName, -1, false);
+    }
+
+    public static void dropKeyspace(String ksName, long timestamp) throws IOException
+    {
+        dropKeyspace(ksName, timestamp, true);
+    }
+
+    /* Migration Helper implementations */
+
+    private static void addKeyspace(KSMetaData ksm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException
+    {
+        RowMutation keyspaceDef = ksm.toSchema(timestamp);
+
+        if (withSchemaRecord)
+            keyspaceDef.apply();
+
+        Schema.instance.load(ksm);
+
+        if (!StorageService.instance.isClientMode())
+            Table.open(ksm.name);
+    }
+
+    private static void addColumnFamily(CFMetaData cfm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException
+    {
+        KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName);
+        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
+
+        Schema.instance.load(cfm);
+
+        if (withSchemaRecord)
+            cfm.toSchema(timestamp).apply();
+
+        // make sure it's init-ed w/ the old definitions first,
+        // since we're going to call initCf on the new one manually
+        Table.open(cfm.ksName);
+
+        Schema.instance.setTableDefinition(ksm);
+
+        if (!StorageService.instance.isClientMode())
+            Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
+    }
+
+    private static void updateKeyspace(KsDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException
+    {
+        KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
+
+        if (withSchemaRecord)
+        {
+            RowMutation schemaUpdate = oldKsm.diff(newState, timestamp);
+            schemaUpdate.apply();
+        }
+
+        KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values());
+
+        Schema.instance.setTableDefinition(newKsm);
+
+        if (!StorageService.instance.isClientMode())
+            Table.open(newState.name).createReplicationStrategy(newKsm);
+    }
+
+    private static void updateColumnFamily(CfDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException
+    {
+        CFMetaData cfm = Schema.instance.getCFMetaData(newState.keyspace, newState.name);
+
+        if (withSchemaRecord)
+        {
+            RowMutation schemaUpdate = cfm.diff(newState, timestamp);
+            schemaUpdate.apply();
+        }
+
+        cfm.reload();
+
+        if (!StorageService.instance.isClientMode())
+        {
+            Table table = Table.open(cfm.ksName);
+            table.getColumnFamilyStore(cfm.cfName).reload();
+        }
+    }
+
+    private static void dropKeyspace(String ksName, long timestamp, boolean withSchemaRecord) throws IOException
+    {
+        KSMetaData ksm = Schema.instance.getTableDefinition(ksName);
+        String snapshotName = Table.getTimestampedSnapshotName(ksName);
+
+        // remove all cfs from the table instance.
+        for (CFMetaData cfm : ksm.cfMetaData().values())
+        {
+            ColumnFamilyStore cfs = Table.open(ksm.name).getColumnFamilyStore(cfm.cfName);
+
+            Schema.instance.purge(cfm);
+
+            if (!StorageService.instance.isClientMode())
+            {
+                cfs.snapshot(snapshotName);
+                Table.open(ksm.name).dropCf(cfm.cfId);
+            }
+        }
+
+        if (withSchemaRecord)
+        {
+            for (RowMutation m : ksm.dropFromSchema(timestamp))
+                m.apply();
+        }
+
+        // remove the table from the static instances.
+        Table.clear(ksm.name);
+        Schema.instance.clearTableDefinition(ksm);
+    }
+
+    private static void dropColumnFamily(String ksName, String cfName, long timestamp, boolean withSchemaRecord) throws IOException
+    {
+        KSMetaData ksm = Schema.instance.getTableDefinition(ksName);
+        ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName);
+
+        // reinitialize the table.
+        CFMetaData cfm = ksm.cfMetaData().get(cfName);
+
+        Schema.instance.purge(cfm);
+        Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm));
+
+        if (withSchemaRecord)
+            cfm.dropFromSchema(timestamp).apply();
+
+        if (!StorageService.instance.isClientMode())
+        {
+            cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily));
+            Table.open(ksm.name).dropCf(cfm.cfId);
+        }
+    }
+
+    private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
+    {
+        // clone ksm but do not include the new def
+        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+        newCfs.remove(toExclude);
+        assert newCfs.size() == ksm.cfMetaData().size() - 1;
+        return KSMetaData.cloneWith(ksm, newCfs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java b/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
index b3f4b9a..77d014f 100644
--- a/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
@@ -1,15 +1,3 @@
-package org.apache.cassandra.db.migration;
-
-import java.io.IOException;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.migration.avro.ColumnDef;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,81 +15,36 @@ import org.apache.cassandra.utils.UUIDGen;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.cassandra.db.migration;
+
+import java.io.IOException;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.thrift.CfDef;
 
 public class UpdateColumnFamily extends Migration
 {
-    // does not point to a CFM stored in DatabaseDescriptor.
-    private CFMetaData metadata;
-    
-    protected UpdateColumnFamily() { }
-    
-    /** assumes validation has already happened. That is, replacing oldCfm with newCfm is neither illegal or totally whackass. */
-    public UpdateColumnFamily(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException, IOException
-    {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        
-        KSMetaData ksm = schema.getTableDefinition(cf_def.keyspace.toString());
-        if (ksm == null)
-            throw new ConfigurationException("No such keyspace: " + cf_def.keyspace.toString());
-        if (cf_def.column_metadata != null)
-        {
-            for (ColumnDef entry : cf_def.column_metadata)
-            {
-                if (entry.index_name != null && !Migration.isLegalName(entry.index_name.toString()))
-                    throw new ConfigurationException("Invalid index name: " + entry.index_name);
-            }
-        }
+    private final CfDef newState;
 
-        CFMetaData oldCfm = schema.getCFMetaData(cf_def.keyspace.toString(), cf_def.name.toString());
-        
-        // create a copy of the old CF meta data. Apply new settings on top of it.
-        this.metadata = CFMetaData.fromAvro(oldCfm.toAvro());
-        this.metadata.apply(cf_def);
-        
-        // create a copy of the old KS meta data. Use it to create a RowMutation that gets applied to schema and migrations.
-        KSMetaData newKsMeta = KSMetaData.fromAvro(ksm.toAvro());
-        newKsMeta.cfMetaData().get(cf_def.name.toString()).apply(cf_def);
-        rm = makeDefinitionMutation(newKsMeta, null, newVersion);
-    }
-
-    void applyModels() throws IOException
+    public UpdateColumnFamily(CfDef newState) throws ConfigurationException
     {
-        logger.debug("Updating " + schema.getCFMetaData(metadata.cfId) + " to " + metadata);
-        // apply the meta update.
-        try 
-        {
-            schema.getCFMetaData(metadata.cfId).apply(metadata.toAvro());
-        }
-        catch (ConfigurationException ex) 
-        {
-            throw new IOException(ex);
-        }
-        schema.setTableDefinition(null, newVersion);
+        super(System.nanoTime());
 
-        if (!StorageService.instance.isClientMode())
-        {
-            Table table = Table.open(metadata.ksName, schema);
-            ColumnFamilyStore oldCfs = table.getColumnFamilyStore(metadata.cfName);
-            oldCfs.reload();
-        }
-    }
+        if (Schema.instance.getCFMetaData(newState.keyspace, newState.name) == null)
+            throw new ConfigurationException(String.format("(ks=%s, cf=%s) cannot be updated because it doesn't exist.", newState.keyspace, newState.name));
 
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.UpdateColumnFamily update = new org.apache.cassandra.db.migration.avro.UpdateColumnFamily();
-        update.metadata = metadata.toAvro();
-        mi.migration = update;
+        this.newState = newState;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.UpdateColumnFamily update = (org.apache.cassandra.db.migration.avro.UpdateColumnFamily)mi.migration;
-        metadata = CFMetaData.fromAvro(update.metadata);
+        MigrationHelper.updateColumnFamily(newState, timestamp);
     }
 
     @Override
     public String toString()
     {
-        return String.format("Update column family to %s", metadata.toString());
+        return String.format("Update column family with %s", newState);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37b07935/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java b/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
index 7d90f2c..86968be 100644
--- a/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
+++ b/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
@@ -1,12 +1,3 @@
-package org.apache.cassandra.db.migration;
-
-import java.io.IOException;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -15,80 +6,50 @@ import org.apache.cassandra.utils.UUIDGen;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.cassandra.db.migration;
+
+import java.io.IOException;
 
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.thrift.KsDef;
 
 public class UpdateKeyspace extends Migration
 {
-    private KSMetaData newKsm;
-    private KSMetaData oldKsm;
-    
-    /** Required no-arg constructor */
-    protected UpdateKeyspace() { }
-    
-    /** create migration based on thrift parameters */
-    public UpdateKeyspace(KSMetaData ksm) throws ConfigurationException, IOException
-    {
-        super(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()), Schema.instance.getVersion());
-        
-        assert ksm != null;
-        assert ksm.cfMetaData() != null;
-        if (ksm.cfMetaData().size() > 0)
-            throw new ConfigurationException("Updated keyspace must not contain any column families");
-    
-        // create the new ksm by merging the one passed in with the cf defs from the exisitng ksm.
-        oldKsm = schema.getKSMetaData(ksm.name);
-        if (oldKsm == null)
-            throw new ConfigurationException(ksm.name + " cannot be updated because it doesn't exist.");
+    private final KsDef newState;
 
-        this.newKsm = KSMetaData.cloneWith(ksm, oldKsm.cfMetaData().values());
-        rm = makeDefinitionMutation(newKsm, oldKsm, newVersion);
-    }
-    
-    void applyModels() throws IOException
+    public UpdateKeyspace(KsDef newState) throws ConfigurationException
     {
-        schema.clearTableDefinition(oldKsm, newVersion);
-        schema.setTableDefinition(newKsm, newVersion);
+        super(System.nanoTime());
 
-        Table table = Table.open(newKsm.name, schema);
-        try
-        {
-            table.createReplicationStrategy(newKsm);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
+        if (newState.isSetCf_defs() && newState.getCf_defs().size() > 0)
+            throw new ConfigurationException("Updated keyspace must not contain any column families.");
 
-        logger.info("Keyspace updated. Please perform any manual operations.");
-    }
+        if (Schema.instance.getKSMetaData(newState.name) == null)
+            throw new ConfigurationException(newState.name + " cannot be updated because it doesn't exist.");
 
-    public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
-    {
-        org.apache.cassandra.db.migration.avro.UpdateKeyspace uks = new org.apache.cassandra.db.migration.avro.UpdateKeyspace();
-        uks.newKs = newKsm.toAvro();
-        uks.oldKs = oldKsm.toAvro();
-        mi.migration = uks;
+        this.newState = newState;
     }
 
-    public void subinflate(org.apache.cassandra.db.migration.avro.Migration mi)
+    protected void applyImpl() throws ConfigurationException, IOException
     {
-        org.apache.cassandra.db.migration.avro.UpdateKeyspace uks = (org.apache.cassandra.db.migration.avro.UpdateKeyspace)mi.migration;
-        newKsm = KSMetaData.fromAvro(uks.newKs);
-        oldKsm = KSMetaData.fromAvro(uks.oldKs);
+        MigrationHelper.updateKeyspace(newState, timestamp);
+
+        logger.info("Keyspace updated. Please perform any manual operations.");
     }
 
     @Override
     public String toString()
     {
-        return String.format("Update keyspace %s to %s", oldKsm.toString(), newKsm.toString());
+        return String.format("Update keyspace %s with %s", newState.name, newState);
     }
 }