You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:14 UTC

[5/5] cassandra git commit: Isolate schema serializaton code

Isolate schema serializaton code

patch by Aleksey Yeschenko; reviewed by Tyler Hobbs for CASSANDRA-8261


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e9d345f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e9d345f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e9d345f

Branch: refs/heads/trunk
Commit: 3e9d345f0078922950157de4fd4c7992512b43b8
Parents: 32ac6af
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Dec 17 01:12:19 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Dec 17 01:34:16 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 .../org/apache/cassandra/config/CFMetaData.java |  331 +---
 .../cassandra/config/ColumnDefinition.java      |  134 +-
 .../cassandra/config/DatabaseDescriptor.java    |   87 +-
 .../org/apache/cassandra/config/KSMetaData.java |  155 +-
 .../org/apache/cassandra/config/Schema.java     |  248 ++-
 .../cassandra/config/TriggerDefinition.java     |   63 -
 .../org/apache/cassandra/config/UTMetaData.java |   91 +-
 .../cassandra/cql3/functions/Functions.java     |   22 +-
 .../cql3/functions/JavaSourceUDFFactory.java    |    5 +-
 .../cassandra/cql3/functions/UDAggregate.java   |  206 +--
 .../cassandra/cql3/functions/UDFunction.java    |  193 +--
 .../cassandra/cql3/functions/UDHelper.java      |   12 +-
 .../cql3/statements/CreateTableStatement.java   |   24 +-
 .../apache/cassandra/db/AtomicBTreeColumns.java |    3 +-
 .../apache/cassandra/db/BatchlogManager.java    |   14 +-
 .../db/DefinitionsUpdateVerbHandler.java        |    3 +-
 .../org/apache/cassandra/db/DefsTables.java     |  622 --------
 .../cassandra/db/HintedHandOffManager.java      |   16 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |    2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |    2 +-
 .../db/MigrationRequestVerbHandler.java         |    3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  514 ++----
 .../hadoop/ColumnFamilyRecordReader.java        |   28 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |   28 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  |   23 +-
 .../hadoop/pig/AbstractCassandraStorage.java    |   46 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   26 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   10 +-
 .../cassandra/schema/LegacySchemaTables.java    | 1480 ++++++++++++++++++
 .../cassandra/service/CassandraDaemon.java      |    8 +-
 .../apache/cassandra/service/ClientState.java   |    3 +-
 .../cassandra/service/MigrationManager.java     |  127 +-
 .../apache/cassandra/service/MigrationTask.java |    4 +-
 .../apache/cassandra/service/StorageProxy.java  |    2 +-
 .../cassandra/service/StorageService.java       |    4 +-
 .../cassandra/thrift/ThriftConversion.java      |    5 +-
 .../org/apache/cassandra/tools/BulkLoader.java  |    6 +-
 .../apache/cassandra/tools/SSTableExport.java   |    3 +-
 .../apache/cassandra/tools/SSTableImport.java   |    2 +-
 .../cassandra/tools/SSTableLevelResetter.java   |    3 +-
 .../cassandra/tools/StandaloneScrubber.java     |    3 +-
 .../cassandra/tools/StandaloneSplitter.java     |    4 +-
 .../cassandra/tools/StandaloneUpgrader.java     |    2 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   15 +-
 .../config/DatabaseDescriptorTest.java          |    7 +-
 .../org/apache/cassandra/config/DefsTest.java   |  564 -------
 .../apache/cassandra/config/KSMetaDataTest.java |    6 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |    4 +-
 .../cassandra/db/BatchlogManagerTest.java       |    4 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |    8 +-
 .../schema/LegacySchemaTablesTest.java          |  568 +++++++
 .../service/EmbeddedCassandraServiceTest.java   |    2 +-
 .../service/StorageServiceServerTest.java       |    3 +-
 54 files changed, 2792 insertions(+), 2957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3571c1e..6f4cdec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Modernize schema tables (CASSANDRA-8261)
  * Support for user-defined aggregation functions (CASSANDRA-8053)
  * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
  * Refactor SelectStatement, return IN results in natural order instead

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index eb78ec7..0730ba7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
@@ -50,14 +49,12 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.github.jamm.Unmetered;
 
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-import static org.apache.cassandra.utils.FBUtilities.json;
-
 /**
  * This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
  */
@@ -221,7 +218,7 @@ public final class CFMetaData
     public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
 
     // attribute setters that return the modified CFMetaData instance
-    public CFMetaData comment(String prop) { comment = Strings.nullToEmpty(prop); return this;}
+    public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
@@ -344,8 +341,8 @@ public final class CFMetaData
         // Depends on parent's cache setting, turn on its index CF's cache.
         // Row caching is never enabled; see CASSANDRA-5732
         CachingOptions indexCaching = parent.getCaching().keyCache.isEnabled()
-                             ? CachingOptions.KEYS_ONLY
-                             : CachingOptions.NONE;
+                                    ? CachingOptions.KEYS_ONLY
+                                    : CachingOptions.NONE;
 
         return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator, parent.cfId)
                              .keyValidator(info.type)
@@ -386,7 +383,8 @@ public final class CFMetaData
         return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, newCfId), this);
     }
 
-    static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
+    @VisibleForTesting
+    public static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
         List<ColumnDefinition> clonedColumns = new ArrayList<>(oldCFMD.allColumns().size());
         for (ColumnDefinition cd : oldCFMD.allColumns())
@@ -449,6 +447,11 @@ public final class CFMetaData
         return cfName.contains(".");
     }
 
+    public Map<ByteBuffer, ColumnDefinition> getColumnMetadata()
+    {
+        return columnMetadata;
+    }
+
     /**
      *
      * @return The name of the parent cf if this is a seconday index
@@ -723,14 +726,9 @@ public final class CFMetaData
 
     public void reload()
     {
-        Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, ksName, cfName);
-
-        if (cfDefRow.cf == null || !cfDefRow.cf.hasColumns())
-            throw new RuntimeException(String.format("%s not found in the schema definitions keyspace.", ksName + ":" + cfName));
-
         try
         {
-            apply(fromSchema(cfDefRow));
+            apply(LegacySchemaTables.createTableFromName(ksName, cfName));
         }
         catch (ConfigurationException e)
         {
@@ -739,13 +737,12 @@ public final class CFMetaData
     }
 
     /**
-     * Updates CFMetaData in-place to match cf_def
-     *
-     * *Note*: This method left package-private only for DefsTest, don't use directly!
+     * Updates CFMetaData in-place to match cfm
      *
      * @throws ConfigurationException if ks/cf names or cf ids didn't match
      */
-    void apply(CFMetaData cfm) throws ConfigurationException
+    @VisibleForTesting
+    public void apply(CFMetaData cfm) throws ConfigurationException
     {
         logger.debug("applying {} to {}", cfm, this);
 
@@ -1116,89 +1113,6 @@ public final class CFMetaData
                                                            "interval (%d).", maxIndexInterval, minIndexInterval));
     }
 
-    /**
-     * Create schema mutations to update this metadata to provided new state.
-     *
-     * @param newState The new metadata (for the same CF)
-     * @param modificationTimestamp Timestamp to use for mutation
-     * @param fromThrift whether the newState comes from thrift
-     *
-     * @return Difference between attributes in form of schema mutation
-     */
-    public Mutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
-
-        newState.toSchemaNoColumnsNoTriggers(mutation, modificationTimestamp);
-
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
-
-        // columns that are no longer needed
-        for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
-        {
-            // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type
-            // are being deleted just because they are not here.
-            if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
-                continue;
-
-            cd.deleteFromSchema(mutation, modificationTimestamp);
-        }
-
-        // newly added columns
-        for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            cd.toSchema(mutation, modificationTimestamp);
-
-        // old columns with updated attributes
-        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-        {
-            ColumnDefinition cd = newState.columnMetadata.get(name);
-            cd.toSchema(mutation, modificationTimestamp);
-        }
-
-        MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
-
-        // dropped triggers
-        for (TriggerDefinition td : triggerDiff.entriesOnlyOnLeft().values())
-            td.deleteFromSchema(mutation, cfName, modificationTimestamp);
-
-        // newly created triggers
-        for (TriggerDefinition td : triggerDiff.entriesOnlyOnRight().values())
-            td.toSchema(mutation, cfName, modificationTimestamp);
-
-        return mutation;
-    }
-
-    /**
-     * Remove all CF attributes from schema
-     *
-     * @param timestamp Timestamp to use
-     *
-     * @return Mutation to use to completely remove cf from schema
-     */
-    public Mutation dropFromSchema(long timestamp)
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName);
-        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        for (ColumnDefinition cd : allColumns())
-            cd.deleteFromSchema(mutation, timestamp);
-
-        for (TriggerDefinition td : triggers.values())
-            td.deleteFromSchema(mutation, cfName, timestamp);
-
-        for (String indexName : Keyspace.open(this.ksName).getColumnFamilyStore(this.cfName).getBuiltIndexes())
-        {
-            ColumnFamily indexCf = mutation.addOrGet(SystemKeyspace.BuiltIndexesTable);
-            indexCf.addTombstone(indexCf.getComparator().makeCellName(indexName), ldt, timestamp);
-        }
-
-        return mutation;
-    }
-
     public boolean isPurged()
     {
         return isPurged;
@@ -1209,215 +1123,6 @@ public final class CFMetaData
         isPurged = true;
     }
 
-    public void toSchema(Mutation mutation, long timestamp)
-    {
-        toSchemaNoColumnsNoTriggers(mutation, timestamp);
-
-        for (TriggerDefinition td : triggers.values())
-            td.toSchema(mutation, cfName, timestamp);
-
-        for (ColumnDefinition cd : allColumns())
-            cd.toSchema(mutation, timestamp);
-    }
-
-    private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
-    {
-        // For property that can be null (and can be changed), we insert tombstones, to make sure
-        // we don't keep a property the user has removed
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable);
-        Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName);
-        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
-        adder.add("cf_id", cfId);
-        adder.add("type", cfType.toString());
-
-        if (isSuper())
-        {
-            // We need to continue saving the comparator and subcomparator separatly, otherwise
-            // we won't know at deserialization if the subcomparator should be taken into account
-            // TODO: we should implement an on-start migration if we want to get rid of that.
-            adder.add("comparator", comparator.subtype(0).toString());
-            adder.add("subcomparator", comparator.subtype(1).toString());
-        }
-        else
-        {
-            adder.add("comparator", comparator.toString());
-        }
-
-        adder.add("comment", comment);
-        adder.add("read_repair_chance", readRepairChance);
-        adder.add("local_read_repair_chance", dcLocalReadRepairChance);
-        adder.add("gc_grace_seconds", gcGraceSeconds);
-        adder.add("default_validator", defaultValidator.toString());
-        adder.add("key_validator", keyValidator.toString());
-        adder.add("min_compaction_threshold", minCompactionThreshold);
-        adder.add("max_compaction_threshold", maxCompactionThreshold);
-        adder.add("bloom_filter_fp_chance", getBloomFilterFpChance());
-        adder.add("memtable_flush_period_in_ms", memtableFlushPeriod);
-        adder.add("caching", caching.toString());
-        adder.add("default_time_to_live", defaultTimeToLive);
-        adder.add("compaction_strategy_class", compactionStrategyClass.getName());
-        adder.add("compression_parameters", json(compressionParameters.asThriftOptions()));
-        adder.add("compaction_strategy_options", json(compactionStrategyOptions));
-        adder.add("min_index_interval", minIndexInterval);
-        adder.add("max_index_interval", maxIndexInterval);
-        adder.add("speculative_retry", speculativeRetry.toString());
-
-        for (Map.Entry<ColumnIdentifier, Long> entry : droppedColumns.entrySet())
-            adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
-
-        adder.add("is_dense", isDense);
-    }
-
-    @VisibleForTesting
-    public static CFMetaData fromSchemaNoTriggers(UntypedResultSet.Row result, UntypedResultSet serializedColumnDefinitions)
-    {
-        try
-        {
-            String ksName = result.getString("keyspace_name");
-            String cfName = result.getString("columnfamily_name");
-
-            AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
-            AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
-            ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
-
-            AbstractType<?> fullRawComparator = makeRawAbstractType(rawComparator, subComparator);
-
-            List<ColumnDefinition> columnDefs = ColumnDefinition.fromSchema(serializedColumnDefinitions,
-                                                                            ksName,
-                                                                            cfName,
-                                                                            fullRawComparator,
-                                                                            cfType == ColumnFamilyType.Super);
-
-            boolean isDense = result.has("is_dense")
-                            ? result.getBoolean("is_dense")
-                            : calculateIsDense(fullRawComparator, columnDefs);
-
-            CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
-
-            // if we are upgrading, we use id generated from names initially
-            UUID cfId = result.has("cf_id")
-                      ? result.getUUID("cf_id")
-                      : generateLegacyCfId(ksName, cfName);
-
-            CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
-            cfm.isDense(isDense);
-
-            cfm.readRepairChance(result.getDouble("read_repair_chance"));
-            cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
-            cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
-            cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
-            cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
-            cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
-            cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
-            if (result.has("comment"))
-                cfm.comment(result.getString("comment"));
-            if (result.has("memtable_flush_period_in_ms"))
-                cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
-            cfm.caching(CachingOptions.fromString(result.getString("caching")));
-            if (result.has("default_time_to_live"))
-                cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
-            if (result.has("speculative_retry"))
-                cfm.speculativeRetry(SpeculativeRetry.fromString(result.getString("speculative_retry")));
-            cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
-            cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
-            cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
-
-            if (result.has("min_index_interval"))
-                cfm.minIndexInterval(result.getInt("min_index_interval"));
-
-            if (result.has("max_index_interval"))
-                cfm.maxIndexInterval(result.getInt("max_index_interval"));
-
-            if (result.has("bloom_filter_fp_chance"))
-                cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
-            else
-                cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
-
-            if (result.has("dropped_columns"))
-                cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
-
-            for (ColumnDefinition cd : columnDefs)
-                cfm.addOrReplaceColumnDefinition(cd);
-
-            return cfm.rebuild();
-        }
-        catch (SyntaxException | ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
-    {
-        if (comparator instanceof CompositeType)
-        {
-            CompositeType ct = (CompositeType)comparator;
-            for (int i = 0; i < aliases.size(); ++i)
-            {
-                if (aliases.get(i) != null)
-                {
-                    addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(i), ct.types.get(i), i, kind));
-                }
-            }
-        }
-        else
-        {
-            assert aliases.size() <= 1;
-            if (!aliases.isEmpty() && aliases.get(0) != null)
-                addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(0), comparator, null, kind));
-        }
-    }
-
-    /**
-     * Deserialize CF metadata from low-level representation
-     *
-     * @return Metadata deserialized from schema
-     */
-    public static CFMetaData fromSchema(UntypedResultSet.Row result)
-    {
-        String ksName = result.getString("keyspace_name");
-        String cfName = result.getString("columnfamily_name");
-
-        Row serializedColumns = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_TABLE, ksName, cfName);
-        CFMetaData cfm = fromSchemaNoTriggers(result, ColumnDefinition.resultify(serializedColumns));
-
-        Row serializedTriggers = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, ksName, cfName);
-        addTriggerDefinitionsFromSchema(cfm, serializedTriggers);
-
-        return cfm;
-    }
-
-    private static CFMetaData fromSchema(Row row)
-    {
-        UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row).one();
-        return fromSchema(result);
-    }
-
-    private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
-    {
-        Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
-        for (Map.Entry<String, Long> entry : raw.entrySet())
-            converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
-        return converted;
-    }
-
-    /**
-     * Convert current metadata into schema mutation
-     *
-     * @param timestamp Timestamp to use
-     *
-     * @return Low-level representation of the CF
-     *
-     * @throws ConfigurationException if any of the attributes didn't pass validation
-     */
-    public Mutation toSchema(long timestamp) throws ConfigurationException
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
-        toSchema(mutation, timestamp);
-        return mutation;
-    }
-
     // The comparator to validate the definition name.
 
     public AbstractType<?> getColumnDefinitionComparator(ColumnDefinition def)
@@ -1474,12 +1179,6 @@ public final class CFMetaData
         return columnMetadata.remove(def.name.bytes) != null;
     }
 
-    private static void addTriggerDefinitionsFromSchema(CFMetaData cfDef, Row serializedTriggerDefinitions)
-    {
-        for (TriggerDefinition td : TriggerDefinition.fromSchema(serializedTriggerDefinitions))
-            cfDef.triggers.put(td.name, td);
-    }
-
     public void addTriggerDefinition(TriggerDefinition def) throws InvalidRequestException
     {
         if (containsTriggerDefinition(def))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 354a6f1..1cc7f1d 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -26,25 +26,11 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.utils.FBUtilities.json;
 
 public class ColumnDefinition extends ColumnSpecification
 {
-    // system.schema_columns column names
-    private static final String COLUMN_NAME = "column_name";
-    private static final String TYPE = "validator";
-    private static final String INDEX_TYPE = "index_type";
-    private static final String INDEX_OPTIONS = "index_options";
-    private static final String INDEX_NAME = "index_name";
-    private static final String COMPONENT_INDEX = "component_index";
-    private static final String KIND = "type";
-
     /*
      * The type of CQL3 column this definition represents.
      * There is 3 main type of CQL3 columns: those parts of the partition key,
@@ -62,20 +48,7 @@ public class ColumnDefinition extends ColumnSpecification
         CLUSTERING_COLUMN,
         REGULAR,
         STATIC,
-        COMPACT_VALUE;
-
-        public String serialize()
-        {
-            // For backward compatibility we need to special case CLUSTERING_COLUMN
-            return this == CLUSTERING_COLUMN ? "clustering_key" : this.toString().toLowerCase();
-        }
-
-        public static Kind deserialize(String value)
-        {
-            if (value.equalsIgnoreCase("clustering_key"))
-                return CLUSTERING_COLUMN;
-            return Enum.valueOf(Kind.class, value.toUpperCase());
-        }
+        COMPACT_VALUE
     }
 
     public final Kind kind;
@@ -266,36 +239,6 @@ public class ColumnDefinition extends ColumnSpecification
         return kind == Kind.REGULAR || kind == Kind.STATIC;
     }
 
-    /**
-     * Drop specified column from the schema using given mutation.
-     *
-     * @param mutation  The schema mutation
-     * @param timestamp The timestamp to use for column modification
-     */
-    public void deleteFromSchema(Mutation mutation, long timestamp)
-    {
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
-        Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString());
-        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-    }
-
-    public void toSchema(Mutation mutation, long timestamp)
-    {
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable);
-        Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString());
-        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
-        adder.add(TYPE, type.toString());
-        adder.add(INDEX_TYPE, indexType == null ? null : indexType.toString());
-        adder.add(INDEX_OPTIONS, json(indexOptions));
-        adder.add(INDEX_NAME, indexName);
-        adder.add(COMPONENT_INDEX, componentIndex);
-        adder.add(KIND, kind.serialize());
-    }
-
     public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
     {
         assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex);
@@ -323,81 +266,6 @@ public class ColumnDefinition extends ColumnSpecification
                                     kind);
     }
 
-    public static UntypedResultSet resultify(Row serializedColumns)
-    {
-        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_COLUMNS_TABLE);
-        return QueryProcessor.resultify(query, serializedColumns);
-    }
-
-    /**
-     * Deserialize columns from storage-level representation
-     *
-     * @param serializedColumns storage-level partition containing the column definitions
-     * @return the list of processed ColumnDefinitions
-     */
-    public static List<ColumnDefinition> fromSchema(UntypedResultSet serializedColumns, String ksName, String cfName, AbstractType<?> rawComparator, boolean isSuper)
-    {
-        List<ColumnDefinition> cds = new ArrayList<>();
-        for (UntypedResultSet.Row row : serializedColumns)
-        {
-            Kind kind = row.has(KIND)
-                      ? Kind.deserialize(row.getString(KIND))
-                      : Kind.REGULAR;
-
-            Integer componentIndex = null;
-            if (row.has(COMPONENT_INDEX))
-                componentIndex = row.getInt(COMPONENT_INDEX);
-            else if (kind == Kind.CLUSTERING_COLUMN && isSuper)
-                componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
-
-            // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
-            // we need to use the comparator fromString method
-            AbstractType<?> comparator = getComponentComparator(rawComparator, componentIndex, kind);
-            ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator);
-
-            AbstractType<?> validator;
-            try
-            {
-                validator = TypeParser.parse(row.getString(TYPE));
-            }
-            catch (RequestValidationException e)
-            {
-                throw new RuntimeException(e);
-            }
-
-            IndexType indexType = null;
-            if (row.has(INDEX_TYPE))
-                indexType = IndexType.valueOf(row.getString(INDEX_TYPE));
-
-            Map<String, String> indexOptions = null;
-            if (row.has(INDEX_OPTIONS))
-                indexOptions = FBUtilities.fromJsonMap(row.getString(INDEX_OPTIONS));
-
-            String indexName = null;
-            if (row.has(INDEX_NAME))
-                indexName = row.getString(INDEX_NAME);
-
-            cds.add(new ColumnDefinition(ksName, cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
-        }
-
-        return cds;
-    }
-
-    public static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex, ColumnDefinition.Kind kind)
-    {
-        switch (kind)
-        {
-            case REGULAR:
-                if (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
-                    return rawComparator;
-
-                return ((CompositeType)rawComparator).types.get(componentIndex);
-            default:
-                // CQL3 column names are UTF8
-                return UTF8Type.instance;
-        }
-    }
-
     public String getIndexName()
     {
         return indexName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a0e84f9..f2897ee 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -18,49 +18,29 @@
 package org.apache.cassandra.config;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.net.*;
+import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Longs;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.AllowAllAuthenticator;
-import org.apache.cassandra.auth.AllowAllAuthorizer;
-import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.auth.IAuthorizer;
-import org.apache.cassandra.auth.IInternodeAuthenticator;
+
+import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.Config.RequestSchedulerId;
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DefsTables;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.IAllocator;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.EndpointSnitchInfo;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
@@ -69,10 +49,7 @@ import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.memory.HeapPool;
-import org.apache.cassandra.utils.memory.NativePool;
-import org.apache.cassandra.utils.memory.MemtablePool;
-import org.apache.cassandra.utils.memory.SlabPool;
+import org.apache.cassandra.utils.memory.*;
 
 public class DatabaseDescriptor
 {
@@ -585,9 +562,6 @@ public class DatabaseDescriptor
             conf.server_encryption_options = conf.encryption_options;
         }
 
-        // hardcoded system keyspace
-        Schema.instance.load(SystemKeyspace.definition());
-
         // load the seeds for node contact points
         if (conf.seed_provider == null)
         {
@@ -620,53 +594,6 @@ public class DatabaseDescriptor
         return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
     }
 
-    /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
-    public static void loadSchemas()
-    {
-        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_TABLE);
-
-        // if keyspace with definitions is empty try loading the old way
-        if (schemaCFS.estimateKeys() == 0)
-        {
-            logger.info("Couldn't detect any schema definitions in local storage.");
-            // peek around the data directories to see if anything is there.
-            if (hasExistingNoSystemTables())
-                logger.info("Found keyspace data in data directories. Consider using cqlsh to define your schema.");
-            else
-                logger.info("To create keyspaces and column families, see 'help create' in cqlsh.");
-        }
-        else
-        {
-            Schema.instance.load(DefsTables.loadFromKeyspace());
-        }
-
-        Schema.instance.updateVersion();
-    }
-
-    private static boolean hasExistingNoSystemTables()
-    {
-        for (String dataDir : getAllDataFileLocations())
-        {
-            File dataPath = new File(dataDir);
-            if (dataPath.exists() && dataPath.isDirectory())
-            {
-                // see if there are other directories present.
-                int dirCount = dataPath.listFiles(new FileFilter()
-                {
-                    public boolean accept(File pathname)
-                    {
-                        return pathname.isDirectory() && !pathname.getName().equals(SystemKeyspace.NAME);
-                    }
-                }).length;
-
-                if (dirCount > 0)
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
     public static IAuthenticator getAuthenticator()
     {
         return authenticator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index e5576ad..1537aae 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -21,15 +21,10 @@ import java.util.*;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.service.StorageService;
 
-import static org.apache.cassandra.utils.FBUtilities.*;
-
 public final class KSMetaData
 {
     public final String name;
@@ -43,18 +38,26 @@ public final class KSMetaData
     public KSMetaData(String name,
                       Class<? extends AbstractReplicationStrategy> strategyClass,
                       Map<String, String> strategyOptions,
+                      boolean durableWrites)
+    {
+        this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData());
+    }
+
+    public KSMetaData(String name,
+                      Class<? extends AbstractReplicationStrategy> strategyClass,
+                      Map<String, String> strategyOptions,
                       boolean durableWrites,
                       Iterable<CFMetaData> cfDefs)
     {
         this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
     }
 
-    KSMetaData(String name,
-               Class<? extends AbstractReplicationStrategy> strategyClass,
-               Map<String, String> strategyOptions,
-               boolean durableWrites,
-               Iterable<CFMetaData> cfDefs,
-               UTMetaData userTypes)
+    private KSMetaData(String name,
+                       Class<? extends AbstractReplicationStrategy> strategyClass,
+                       Map<String, String> strategyOptions,
+                       boolean durableWrites,
+                       Iterable<CFMetaData> cfDefs,
+                       UTMetaData userTypes)
     {
         this.name = name;
         this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
@@ -82,9 +85,27 @@ public final class KSMetaData
         return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData());
     }
 
-    public static KSMetaData cloneWith(KSMetaData ksm, Iterable<CFMetaData> cfDefs)
+    public KSMetaData cloneWithTableRemoved(CFMetaData table)
+    {
+        // clone ksm but do not include the new table
+        List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
+        newTables.remove(table);
+        assert newTables.size() == cfMetaData().size() - 1;
+        return cloneWith(newTables, userTypes);
+    }
+
+    public KSMetaData cloneWithTableAdded(CFMetaData table)
+    {
+        // clone ksm but include the new table
+        List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
+        newTables.add(table);
+        assert newTables.size() == cfMetaData().size() + 1;
+        return cloneWith(newTables, userTypes);
+    }
+
+    public KSMetaData cloneWith(Iterable<CFMetaData> tables, UTMetaData types)
     {
-        return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs, ksm.userTypes);
+        return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types);
     }
 
     public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
@@ -145,11 +166,6 @@ public final class KSMetaData
         return Collections.singletonMap("replication_factor", rf.toString());
     }
 
-    public Mutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
-    {
-        return newState.toSchema(modificationTimestamp);
-    }
-
     public KSMetaData validate() throws ConfigurationException
     {
         if (!CFMetaData.isNameValid(name))
@@ -165,107 +181,4 @@ public final class KSMetaData
 
         return this;
     }
-
-    public KSMetaData reloadAttributes()
-    {
-        Row ksDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, name);
-
-        if (ksDefRow.cf == null)
-            throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", name, SystemKeyspace.SCHEMA_KEYSPACES_TABLE));
-
-        return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList(), userTypes);
-    }
-
-    public Mutation dropFromSchema(long timestamp)
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name));
-
-        mutation.delete(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, timestamp);
-        mutation.delete(SystemKeyspace.BUILT_INDEXES_TABLE, timestamp);
-
-        return mutation;
-    }
-
-    public Mutation toSchema(long timestamp)
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name));
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaKeyspacesTable);
-        CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.SchemaKeyspacesTable.comparator.builder().build(), timestamp);
-
-        adder.add("durable_writes", durableWrites);
-        adder.add("strategy_class", strategyClass.getName());
-        adder.add("strategy_options", json(strategyOptions));
-
-        for (CFMetaData cfm : cfMetaData.values())
-            cfm.toSchema(mutation, timestamp);
-
-        userTypes.toSchema(mutation, timestamp);
-        return mutation;
-    }
-
-    /**
-     * Deserialize only Keyspace attributes without nested ColumnFamilies
-     *
-     * @param row Keyspace attributes in serialized form
-     *
-     * @return deserialized keyspace without cf_defs
-     */
-    public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms, UTMetaData userTypes)
-    {
-        UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one();
-        try
-        {
-            return new KSMetaData(result.getString("keyspace_name"),
-                                  AbstractReplicationStrategy.getClass(result.getString("strategy_class")),
-                                  fromJsonMap(result.getString("strategy_options")),
-                                  result.getBoolean("durable_writes"),
-                                  cfms,
-                                  userTypes);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Deserialize Keyspace with nested ColumnFamilies
-     *
-     * @param serializedKs Keyspace in serialized form
-     * @param serializedCFs Collection of the serialized ColumnFamilies
-     *
-     * @return deserialized keyspace with cf_defs
-     */
-    public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs, Row serializedUserTypes)
-    {
-        Map<String, CFMetaData> cfs = deserializeColumnFamilies(serializedCFs);
-        UTMetaData userTypes = new UTMetaData(UTMetaData.fromSchema(serializedUserTypes));
-        return fromSchema(serializedKs, cfs.values(), userTypes);
-    }
-
-    /**
-     * Deserialize ColumnFamilies from low-level schema representation, all of them belong to the same keyspace
-     *
-     * @return map containing name of the ColumnFamily and it's metadata for faster lookup
-     */
-    public static Map<String, CFMetaData> deserializeColumnFamilies(Row row)
-    {
-        if (row.cf == null)
-            return Collections.emptyMap();
-
-        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row);
-        Map<String, CFMetaData> cfms = new HashMap<>(results.size());
-        for (UntypedResultSet.Row result : results)
-        {
-            CFMetaData cfm = CFMetaData.fromSchema(result);
-            cfms.put(cfm.cfName, cfm);
-        }
-        return cfms;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 43cc6b5..21244ab 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.config;
 
-import java.nio.charset.CharacterCodingException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
@@ -27,13 +26,18 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ConcurrentBiMap;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -78,10 +82,20 @@ public class Schema
     }
 
     /**
-     * Initialize empty schema object
+     * Initialize empty schema object and load the hardcoded system tables
      */
     public Schema()
-    {}
+    {
+        load(SystemKeyspace.definition());
+    }
+
+    /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
+    public Schema loadFromDisk()
+    {
+        load(LegacySchemaTables.readSchemaFromSystemTables());
+        updateVersion();
+        return this;
+    }
 
     /**
      * Load up non-system keyspaces
@@ -350,28 +364,8 @@ public class Schema
      */
     public void updateVersion()
     {
-        try
-        {
-            MessageDigest versionDigest = MessageDigest.getInstance("MD5");
-
-            for (Row row : SystemKeyspace.serializedSchema())
-            {
-                if (invalidSchemaRow(row) || ignoredSchemaRow(row))
-                    continue;
-
-                // we want to digest only live columns
-                ColumnFamilyStore.removeDeletedColumnsOnly(row.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater);
-                row.cf.purgeTombstones(Integer.MAX_VALUE);
-                row.cf.updateDigest(versionDigest);
-            }
-
-            version = UUID.nameUUIDFromBytes(versionDigest.digest());
-            SystemKeyspace.updateSchemaVersion(version);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        version = LegacySchemaTables.calculateSchemaDigest();
+        SystemKeyspace.updateSchemaVersion(version);
     }
 
     /*
@@ -399,20 +393,202 @@ public class Schema
         updateVersionAndAnnounce();
     }
 
-    public static boolean invalidSchemaRow(Row row)
+    public void addKeyspace(KSMetaData ksm)
     {
-        return row.cf == null || (row.cf.isMarkedForDelete() && !row.cf.hasColumns());
+        assert getKSMetaData(ksm.name) == null;
+        load(ksm);
+
+        Keyspace.open(ksm.name);
+        MigrationManager.instance.notifyCreateKeyspace(ksm);
     }
 
-    public static boolean ignoredSchemaRow(Row row)
+    public void updateKeyspace(String ksName)
     {
-        try
-        {
-            return ByteBufferUtil.string(row.key.getKey()).equals(SystemKeyspace.NAME);
-        }
-        catch (CharacterCodingException e)
+        KSMetaData oldKsm = getKSMetaData(ksName);
+        assert oldKsm != null;
+        KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes);
+
+        setKeyspaceDefinition(newKsm);
+
+        Keyspace.open(ksName).createReplicationStrategy(newKsm);
+        MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+    }
+
+    public void dropKeyspace(String ksName)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+        String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
+
+        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
+
+        Keyspace keyspace = Keyspace.open(ksm.name);
+
+        // remove all cfs from the keyspace instance.
+        List<UUID> droppedCfs = new ArrayList<>();
+        for (CFMetaData cfm : ksm.cfMetaData().values())
         {
-            throw new RuntimeException(e);
+            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
+
+            purge(cfm);
+
+            if (DatabaseDescriptor.isAutoSnapshot())
+                cfs.snapshot(snapshotName);
+            Keyspace.open(ksm.name).dropCf(cfm.cfId);
+
+            droppedCfs.add(cfm.cfId);
         }
+
+        // remove the keyspace from the static instances.
+        Keyspace.clear(ksm.name);
+        clearKeyspaceDefinition(ksm);
+
+        keyspace.writeOrder.awaitNewBarrier();
+
+        // force a new segment in the CL
+        CommitLog.instance.forceRecycleAllSegments(droppedCfs);
+
+        MigrationManager.instance.notifyDropKeyspace(ksm);
+    }
+
+    public void addTable(CFMetaData cfm)
+    {
+        assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
+        KSMetaData ksm = getKSMetaData(cfm.ksName).cloneWithTableAdded(cfm);
+
+        logger.info("Loading {}", cfm);
+
+        load(cfm);
+
+        // make sure it's init-ed w/ the old definitions first,
+        // since we're going to call initCf on the new one manually
+        Keyspace.open(cfm.ksName);
+
+        setKeyspaceDefinition(ksm);
+        Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
+        MigrationManager.instance.notifyCreateColumnFamily(cfm);
+    }
+
+    public void updateTable(String ksName, String tableName)
+    {
+        CFMetaData cfm = getCFMetaData(ksName, tableName);
+        assert cfm != null;
+        cfm.reload();
+
+        Keyspace keyspace = Keyspace.open(cfm.ksName);
+        keyspace.getColumnFamilyStore(cfm.cfName).reload();
+        MigrationManager.instance.notifyUpdateColumnFamily(cfm);
+    }
+
+    public void dropTable(String ksName, String tableName)
+    {
+        KSMetaData ksm = getKSMetaData(ksName);
+        assert ksm != null;
+        ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(tableName);
+        assert cfs != null;
+
+        // reinitialize the keyspace.
+        CFMetaData cfm = ksm.cfMetaData().get(tableName);
+
+        purge(cfm);
+        setKeyspaceDefinition(ksm.cloneWithTableRemoved(cfm));
+
+        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+
+        if (DatabaseDescriptor.isAutoSnapshot())
+            cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
+        Keyspace.open(ksm.name).dropCf(cfm.cfId);
+        MigrationManager.instance.notifyDropColumnFamily(cfm);
+
+        CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
+    }
+
+    public void addType(UserType ut)
+    {
+        KSMetaData ksm = getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        logger.info("Loading {}", ut);
+
+        ksm.userTypes.addType(ut);
+
+        MigrationManager.instance.notifyCreateUserType(ut);
+    }
+
+    public void updateType(UserType ut)
+    {
+        KSMetaData ksm = getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        logger.info("Updating {}", ut);
+
+        ksm.userTypes.addType(ut);
+
+        MigrationManager.instance.notifyUpdateUserType(ut);
+    }
+
+    public void dropType(UserType ut)
+    {
+        KSMetaData ksm = getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        ksm.userTypes.removeType(ut);
+
+        MigrationManager.instance.notifyDropUserType(ut);
+    }
+
+    public void addFunction(UDFunction udf)
+    {
+        logger.info("Loading {}", udf);
+
+        Functions.addFunction(udf);
+
+        MigrationManager.instance.notifyCreateFunction(udf);
+    }
+
+    public void updateFunction(UDFunction udf)
+    {
+        logger.info("Updating {}", udf);
+
+        Functions.replaceFunction(udf);
+
+        MigrationManager.instance.notifyUpdateFunction(udf);
+    }
+
+    public void dropFunction(UDFunction udf)
+    {
+        logger.info("Drop {}", udf);
+
+        // TODO: this is kind of broken as this remove all overloads of the function name
+        Functions.removeFunction(udf.name(), udf.argTypes());
+
+        MigrationManager.instance.notifyDropFunction(udf);
+    }
+
+    public void addAggregate(UDAggregate udf)
+    {
+        logger.info("Loading {}", udf);
+
+        Functions.addFunction(udf);
+
+        MigrationManager.instance.notifyCreateAggregate(udf);
+    }
+
+    public void updateAggregate(UDAggregate udf)
+    {
+        logger.info("Updating {}", udf);
+
+        Functions.replaceFunction(udf);
+
+        MigrationManager.instance.notifyUpdateAggregate(udf);
+    }
+
+    public void dropAggregate(UDAggregate udf)
+    {
+        logger.info("Drop {}", udf);
+
+        // TODO: this is kind of broken as this remove all overloads of the function name
+        Functions.removeFunction(udf.name(), udf.argTypes());
+
+        MigrationManager.instance.notifyDropAggregate(udf);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index a395549..6a84379 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -18,20 +18,10 @@
  */
 package org.apache.cassandra.config;
 
-import java.util.*;
-
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.UTF8Type;
-
 public class TriggerDefinition
 {
-    public static final String TRIGGER_NAME = "trigger_name";
-    public static final String TRIGGER_OPTIONS = "trigger_options";
     public static final String CLASS = "class";
 
     public final String name;
@@ -51,59 +41,6 @@ public class TriggerDefinition
         return new TriggerDefinition(name, classOption);
     }
 
-    /**
-     * Deserialize triggers from storage-level representation.
-     *
-     * @param serializedTriggers storage-level partition containing the trigger definitions
-     * @return the list of processed TriggerDefinitions
-     */
-    public static List<TriggerDefinition> fromSchema(Row serializedTriggers)
-    {
-        List<TriggerDefinition> triggers = new ArrayList<>();
-        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedTriggers))
-        {
-            String name = row.getString(TRIGGER_NAME);
-            String classOption = row.getMap(TRIGGER_OPTIONS, UTF8Type.instance, UTF8Type.instance).get(CLASS);
-            triggers.add(new TriggerDefinition(name, classOption));
-        }
-        return triggers;
-    }
-
-    /**
-     * Add specified trigger to the schema using given mutation.
-     *
-     * @param mutation  The schema mutation
-     * @param cfName    The name of the parent ColumnFamily
-     * @param timestamp The timestamp to use for the columns
-     */
-    public void toSchema(Mutation mutation, String cfName, long timestamp)
-    {
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
-
-        CFMetaData cfm = SystemKeyspace.SchemaTriggersTable;
-        Composite prefix = cfm.comparator.make(cfName, name);
-        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
-        adder.addMapEntry(TRIGGER_OPTIONS, CLASS, classOption);
-    }
-
-    /**
-     * Drop specified trigger from the schema using given mutation.
-     *
-     * @param mutation  The schema mutation
-     * @param cfName    The name of the parent ColumnFamily
-     * @param timestamp The timestamp to use for the tombstone
-     */
-    public void deleteFromSchema(Mutation mutation, String cfName, long timestamp)
-    {
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = SystemKeyspace.SchemaTriggersTable.comparator.make(cfName, name);
-        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-    }
-
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index 46a7a4f..08cedee 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -20,12 +20,7 @@ package org.apache.cassandra.config;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Defined (and loaded) user types.
@@ -42,91 +37,11 @@ public final class UTMetaData
         this(new HashMap<ByteBuffer, UserType>());
     }
 
-    UTMetaData(Map<ByteBuffer, UserType> types)
+    public UTMetaData(Map<ByteBuffer, UserType> types)
     {
         this.userTypes = types;
     }
 
-    private static UserType fromSchema(UntypedResultSet.Row row)
-    {
-        try
-        {
-            String keyspace = row.getString("keyspace_name");
-            ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
-            List<String> rawColumns = row.getList("field_names", UTF8Type.instance);
-            List<String> rawTypes = row.getList("field_types", UTF8Type.instance);
-
-            List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
-            for (String rawColumn : rawColumns)
-                columns.add(ByteBufferUtil.bytes(rawColumn));
-
-            List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
-            for (String rawType : rawTypes)
-                types.add(TypeParser.parse(rawType));
-
-            return new UserType(keyspace, name, columns, types);
-        }
-        catch (RequestValidationException e)
-        {
-            // If it has been written in the schema, it should be valid
-            throw new AssertionError();
-        }
-    }
-
-    public static Map<ByteBuffer, UserType> fromSchema(Row row)
-    {
-        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_TABLE, row);
-        Map<ByteBuffer, UserType> types = new HashMap<>(results.size());
-        for (UntypedResultSet.Row result : results)
-        {
-            UserType type = fromSchema(result);
-            types.put(type.name, type);
-        }
-        return types;
-    }
-
-    public static Mutation toSchema(UserType newType, long timestamp)
-    {
-        return toSchema(new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(newType.keyspace)), newType, timestamp);
-    }
-
-    public static Mutation toSchema(Mutation mutation, UserType newType, long timestamp)
-    {
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE);
-
-        Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(newType.name);
-        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
-        adder.resetCollection("field_names");
-        adder.resetCollection("field_types");
-
-        for (int i = 0; i < newType.size(); i++)
-        {
-            adder.addListEntry("field_names", newType.fieldName(i));
-            adder.addListEntry("field_types", newType.fieldType(i).toString());
-        }
-        return mutation;
-    }
-
-    public Mutation toSchema(Mutation mutation, long timestamp)
-    {
-        for (UserType ut : userTypes.values())
-            toSchema(mutation, ut, timestamp);
-        return mutation;
-    }
-
-    public static Mutation dropFromSchema(UserType droppedType, long timestamp)
-    {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(droppedType.keyspace));
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(droppedType.name);
-        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        return mutation;
-    }
-
     public UserType getType(ByteBuffer typeName)
     {
         return userTypes.get(typeName);
@@ -134,11 +49,11 @@ public final class UTMetaData
 
     public Map<ByteBuffer, UserType> getAllTypes()
     {
-        // Copy to avoid concurrent modification while iterating. Not intended to be called on a criticial path anyway
+        // Copy to avoid concurrent modification while iterating. Not intended to be called on a critical path anyway
         return new HashMap<>(userTypes);
     }
 
-    // This is *not* thread safe but is only called in DefsTables that is synchronized.
+    // This is *not* thread safe but is only called in Schema that is synchronized.
     public void addType(UserType type)
     {
         UserType old = userTypes.get(type.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 7d94e47..b55ebc5 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -22,12 +22,9 @@ import java.util.Collection;
 import java.util.List;
 
 import com.google.common.collect.ArrayListMultimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.IMigrationListener;
@@ -35,16 +32,11 @@ import org.apache.cassandra.service.MigrationManager;
 
 public abstract class Functions
 {
-    private static final Logger logger = LoggerFactory.getLogger(Functions.class);
-
     // We special case the token function because that's the only function whose argument types actually
     // depend on the table on which the function is called. Because it's the sole exception, it's easier
     // to handle it as a special case.
     private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token");
 
-    private static final String SELECT_UD_FUNCTION = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE;
-    private static final String SELECT_UD_AGGREGATE = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_AGGREGATES_TABLE;
-
     private Functions() {}
 
     private static final ArrayListMultimap<FunctionName, Function> declared = ArrayListMultimap.create();
@@ -96,18 +88,6 @@ public abstract class Functions
         declared.put(fun.name(), fun);
     }
 
-    /**
-     * Loading existing UDFs from the schema.
-     */
-    public static void loadUDFFromSchema()
-    {
-        logger.debug("Loading UDFs");
-        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_FUNCTION))
-            addFunction(UDFunction.fromSchema(row));
-        for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_AGGREGATE))
-            addFunction(UDAggregate.fromSchema(row));
-    }
-
     public static ColumnSpecification makeArgSpec(String receiverKs, String receiverCf, Function fun, int i)
     {
         return new ColumnSpecification(receiverKs,
@@ -270,7 +250,7 @@ public abstract class Functions
         return sb.toString();
     }
 
-    // This is *not* thread safe but is only called in DefsTables that is synchronized.
+    // This is *not* thread safe but is only called in SchemaTables that is synchronized.
     public static void addFunction(AbstractFunction fun)
     {
         // We shouldn't get there unless that function don't exist

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 5b1f5bd..e4e6a55 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -76,9 +76,8 @@ public final class JavaSourceUDFFactory
         // It is separated to allow return type and argument type checks during compile time via javassist.
         String codeExecInt = generateExecuteInternalMethod(argNames, body, javaReturnType, javaParamTypes);
 
-        if (logger.isDebugEnabled())
-            logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}",
-                         name, codeCtor, codeExecInt, codeExec);
+        logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}",
+                     name, codeCtor, codeExecInt, codeExec);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index f259265..e9c33ba 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -24,12 +24,7 @@ import com.google.common.base.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.*;
 
 /**
@@ -58,6 +53,45 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
         this.initcond = initcond;
     }
 
+    public static UDAggregate create(FunctionName name,
+                                     List<AbstractType<?>> argTypes,
+                                     AbstractType<?> returnType,
+                                     FunctionName stateFunc,
+                                     FunctionName finalFunc,
+                                     AbstractType<?> stateType,
+                                     ByteBuffer initcond)
+    throws InvalidRequestException
+    {
+        List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
+        stateTypes.add(stateType);
+        stateTypes.addAll(argTypes);
+        List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
+        return new UDAggregate(name,
+                               argTypes,
+                               returnType,
+                               resolveScalar(name, stateFunc, stateTypes),
+                               finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null,
+                               initcond);
+    }
+
+    public static UDAggregate createBroken(FunctionName name,
+                                           List<AbstractType<?>> argTypes,
+                                           AbstractType<?> returnType,
+                                           ByteBuffer initcond,
+                                           final InvalidRequestException reason)
+    {
+        return new UDAggregate(name, argTypes, returnType, null, null, initcond)
+        {
+            public Aggregate newAggregate() throws InvalidRequestException
+            {
+                throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. "
+                                                                + "Please see the server log for more details",
+                                                                this,
+                                                                reason.getMessage()));
+            }
+        };
+    }
+
     public boolean hasReferenceTo(Function function)
     {
         return stateFunction == function || finalFunction == function;
@@ -85,6 +119,26 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
         return false;
     }
 
+    public ScalarFunction stateFunction()
+    {
+        return stateFunction;
+    }
+
+    public ScalarFunction finalFunction()
+    {
+        return finalFunction;
+    }
+
+    public ByteBuffer initialCondition()
+    {
+        return initcond;
+    }
+
+    public AbstractType<?> stateType()
+    {
+        return stateType;
+    }
+
     public Aggregate newAggregate() throws InvalidRequestException
     {
         return new Aggregate()
@@ -128,134 +182,6 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
         return (ScalarFunction) func;
     }
 
-    private static Mutation makeSchemaMutation(FunctionName name)
-    {
-        UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaAggregatesTable.getKeyValidator();
-        return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace));
-    }
-
-    public Mutation toSchemaDrop(long timestamp)
-    {
-        Mutation mutation = makeSchemaMutation(name);
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
-
-        Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        return mutation;
-    }
-
-    public static Map<Composite, UDAggregate> fromSchema(Row row)
-    {
-        UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_AGGREGATES_TABLE, row);
-        Map<Composite, UDAggregate> udfs = new HashMap<>(results.size());
-        for (UntypedResultSet.Row result : results)
-            udfs.put(SystemKeyspace.SchemaAggregatesTable.comparator.make(result.getString("aggregate_name"), result.getBlob("signature")),
-                     fromSchema(result));
-        return udfs;
-    }
-
-    public Mutation toSchemaUpdate(long timestamp)
-    {
-        Mutation mutation = makeSchemaMutation(name);
-        ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
-
-        Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
-        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
-        adder.resetCollection("argument_types");
-        adder.add("return_type", returnType.toString());
-        adder.add("state_func", stateFunction.name().name);
-        if (stateType != null)
-            adder.add("state_type", stateType.toString());
-        if (finalFunction != null)
-            adder.add("final_func", finalFunction.name().name);
-        if (initcond != null)
-            adder.add("initcond", initcond);
-
-        for (AbstractType<?> argType : argTypes)
-            adder.addListEntry("argument_types", argType.toString());
-
-        return mutation;
-    }
-
-    public static UDAggregate fromSchema(UntypedResultSet.Row row)
-    {
-        String ksName = row.getString("keyspace_name");
-        String functionName = row.getString("aggregate_name");
-        FunctionName name = new FunctionName(ksName, functionName);
-
-        List<String> types = row.getList("argument_types", UTF8Type.instance);
-
-        List<AbstractType<?>> argTypes;
-        if (types == null)
-        {
-            argTypes = Collections.emptyList();
-        }
-        else
-        {
-            argTypes = new ArrayList<>(types.size());
-            for (String type : types)
-                argTypes.add(parseType(type));
-        }
-
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
-
-        FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func"));
-        FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
-        AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
-        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
-
-        try
-        {
-            return create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
-        }
-        catch (InvalidRequestException reason)
-        {
-            return createBroken(name, argTypes, returnType, initcond, reason);
-        }
-    }
-
-    private static UDAggregate createBroken(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
-                                            ByteBuffer initcond, final InvalidRequestException reason)
-    {
-        return new UDAggregate(name, argTypes, returnType, null, null, initcond) {
-            public Aggregate newAggregate() throws InvalidRequestException
-            {
-                throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. "
-                                                                + "Please see the server log for more details", this, reason.getMessage()));
-            }
-        };
-    }
-
-    private static UDAggregate create(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
-                                      FunctionName stateFunc, FunctionName finalFunc, AbstractType<?> stateType, ByteBuffer initcond)
-    throws InvalidRequestException
-    {
-        List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
-        stateTypes.add(stateType);
-        stateTypes.addAll(argTypes);
-        List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
-        return new UDAggregate(name, argTypes, returnType,
-                               resolveScalar(name, stateFunc, stateTypes),
-                               finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null,
-                               initcond);
-    }
-
-    private static AbstractType<?> parseType(String str)
-    {
-        // We only use this when reading the schema where we shouldn't get an error
-        try
-        {
-            return TypeParser.parse(str);
-        }
-        catch (SyntaxException | ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Override
     public boolean equals(Object o)
     {
@@ -263,13 +189,13 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
             return false;
 
         UDAggregate that = (UDAggregate) o;
-        return Objects.equal(this.name, that.name)
-               && Functions.typeEquals(this.argTypes, that.argTypes)
-               && Functions.typeEquals(this.returnType, that.returnType)
-               && Objects.equal(this.stateFunction, that.stateFunction)
-               && Objects.equal(this.finalFunction, that.finalFunction)
-               && Objects.equal(this.stateType, that.stateType)
-               && Objects.equal(this.initcond, that.initcond);
+        return Objects.equal(name, that.name)
+            && Functions.typeEquals(argTypes, that.argTypes)
+            && Functions.typeEquals(returnType, that.returnType)
+            && Objects.equal(stateFunction, that.stateFunction)
+            && Objects.equal(finalFunction, that.finalFunction)
+            && Objects.equal(stateType, that.stateType)
+            && Objects.equal(initcond, that.initcond);
     }
 
     @Override