You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/27 20:36:46 UTC

[10/11] Rename Table to Keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c643e2b..0a40457 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -82,7 +82,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
 
-    public final Table table;
+    public final Keyspace keyspace;
     public final String name;
     public final CFMetaData metadata;
     public final IPartitioner partitioner;
@@ -217,7 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         try
         {
-            for (SSTableReader sstable : table.getAllSSTables())
+            for (SSTableReader sstable : keyspace.getAllSSTables())
                 if (sstable.compression)
                     sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance);
         }
@@ -227,7 +227,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    private ColumnFamilyStore(Table table,
+    private ColumnFamilyStore(Keyspace keyspace,
                               String columnFamilyName,
                               IPartitioner partitioner,
                               int generation,
@@ -235,9 +235,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                               Directories directories,
                               boolean loadSSTables)
     {
-        assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
+        assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
 
-        this.table = table;
+        this.keyspace = keyspace;
         name = columnFamilyName;
         this.metadata = metadata;
         this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold());
@@ -284,7 +284,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // register the mbean
         String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
-        mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.table.getName() + ",columnfamily=" + name;
+        mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
         try
         {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -326,7 +326,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             valid = false;
             unregisterMBean();
 
-            SystemTable.removeTruncationRecord(metadata.cfId);
+            SystemKeyspace.removeTruncationRecord(metadata.cfId);
             data.unreferenceSSTables();
             indexManager.invalidate();
         }
@@ -377,24 +377,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return data.getMeanColumns();
     }
 
-    public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, boolean loadSSTables)
+    public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, boolean loadSSTables)
     {
-        return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(table.getName(), columnFamily), loadSSTables);
+        return createColumnFamilyStore(keyspace, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables);
     }
 
-    public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
+    public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
     {
-        return createColumnFamilyStore(table, columnFamily, partitioner, metadata, true);
+        return createColumnFamilyStore(keyspace, columnFamily, partitioner, metadata, true);
     }
 
-    private static synchronized ColumnFamilyStore createColumnFamilyStore(Table table,
+    private static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace,
                                                                          String columnFamily,
                                                                          IPartitioner partitioner,
                                                                          CFMetaData metadata,
                                                                          boolean loadSSTables)
     {
         // get the max generation number, to prevent generation conflicts
-        Directories directories = Directories.create(table.getName(), columnFamily);
+        Directories directories = Directories.create(keyspace.getName(), columnFamily);
         Directories.SSTableLister lister = directories.sstableLister().includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -407,18 +407,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Collections.sort(generations);
         int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
 
-        return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata, directories, loadSSTables);
+        return new ColumnFamilyStore(keyspace, columnFamily, partitioner, value, metadata, directories, loadSSTables);
     }
 
     /**
      * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
      */
-    public static void scrubDataDirectories(String table, String columnFamily)
+    public static void scrubDataDirectories(String keyspaceName, String columnFamily)
     {
         logger.debug("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", columnFamily);
 
-        Directories directories = Directories.create(table, columnFamily);
+        Directories directories = Directories.create(keyspaceName, columnFamily);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
@@ -444,7 +444,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // cleanup incomplete saved caches
-        Pattern tmpCacheFilePattern = Pattern.compile(table + "-" + columnFamily + "-(Key|Row)Cache.*\\.tmp$");
+        Pattern tmpCacheFilePattern = Pattern.compile(keyspaceName + "-" + columnFamily + "-(Key|Row)Cache.*\\.tmp$");
         File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
 
         if (dir.exists())
@@ -457,11 +457,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // also clean out any index leftovers.
-        CFMetaData cfm = Schema.instance.getCFMetaData(table, columnFamily);
+        CFMetaData cfm = Schema.instance.getCFMetaData(keyspaceName, columnFamily);
         if (cfm != null) // secondary indexes aren't stored in DD.
         {
             for (ColumnDefinition def : cfm.allColumns())
-                scrubDataDirectories(table, cfm.indexColumnFamilyName(def));
+                scrubDataDirectories(keyspaceName, cfm.indexColumnFamilyName(def));
         }
     }
 
@@ -544,7 +544,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.info("completed loading ({} ms; {} keys) row cache for {}.{}",
                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
                         cachedRowsRead,
-                        table.getName(),
+                        keyspace.getName(),
                         name);
     }
 
@@ -557,8 +557,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public static synchronized void loadNewSSTables(String ksName, String cfName)
     {
         /** ks/cf existence checks will be done by open and getCFS methods for us */
-        Table table = Table.open(ksName);
-        table.getColumnFamilyStore(cfName).loadNewSSTables();
+        Keyspace keyspace = Keyspace.open(ksName);
+        keyspace.getColumnFamilyStore(cfName).loadNewSSTables();
     }
 
     /**
@@ -566,7 +566,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public synchronized void loadNewSSTables()
     {
-        logger.info("Loading new SSTables for " + table.getName() + "/" + name + "...");
+        logger.info("Loading new SSTables for " + keyspace.getName() + "/" + name + "...");
 
         Set<Descriptor> currentDescriptors = new HashSet<Descriptor>();
         for (SSTableReader sstable : data.getView().sstables)
@@ -627,11 +627,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         if (newSSTables.isEmpty())
         {
-            logger.info("No new SSTables were found for " + table.getName() + "/" + name);
+            logger.info("No new SSTables were found for " + keyspace.getName() + "/" + name);
             return;
         }
 
-        logger.info("Loading new SSTables and building secondary indexes for " + table.getName() + "/" + name + ": " + newSSTables);
+        logger.info("Loading new SSTables and building secondary indexes for " + keyspace.getName() + "/" + name + ": " + newSSTables);
         SSTableReader.acquireReferences(newSSTables);
         data.addSSTables(newSSTables);
         try
@@ -643,12 +643,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             SSTableReader.releaseReferences(newSSTables);
         }
 
-        logger.info("Done loading load new SSTables for " + table.getName() + "/" + name);
+        logger.info("Done loading load new SSTables for " + keyspace.getName() + "/" + name);
     }
 
     public static void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames)
     {
-        ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName);
+        ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
 
         Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
 
@@ -681,7 +681,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         Descriptor desc = new Descriptor(version,
                                          directory,
-                                         table.getName(),
+                                         keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
                                          true);
@@ -700,12 +700,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
          * all ongoing updates to memtables have completed. We can get the tail
          * of the log and use it as the starting position for log replay on recovery.
          *
-         * This is why we Table.switchLock needs to be global instead of per-Table:
+         * This is why we Keyspace.switchLock needs to be global instead of per-Keyspace:
          * we need to schedule discardCompletedSegments calls in the same order as their
          * contexts (commitlog position) were read, even though the flush executor
          * is multithreaded.
          */
-        Table.switchLock.writeLock().lock();
+        Keyspace.switchLock.writeLock().lock();
         try
         {
             final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
@@ -774,7 +774,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
         finally
         {
-            Table.switchLock.writeLock().unlock();
+            Keyspace.switchLock.writeLock().unlock();
         }
     }
 
@@ -820,7 +820,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     /**
      * Insert/Update the column family for this key.
-     * Caller is responsible for acquiring Table.flusherLock!
+     * Caller is responsible for acquiring Keyspace.switchLock
      * param @ lock - lock that needs to be used.
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
@@ -1005,7 +1005,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // cleanup size estimation only counts bytes for keys local to this node
         long expectedFileSize = 0;
-        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(table.getName());
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
         for (SSTableReader sstable : sstables)
         {
             List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(ranges);
@@ -1262,10 +1262,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             int gcBefore = gcBefore(filter.timestamp);
             if (isRowCacheEnabled())
             {
-                UUID cfId = Schema.instance.getId(table.getName(), name);
+                UUID cfId = Schema.instance.getId(keyspace.getName(), name);
                 if (cfId == null)
                 {
-                    logger.trace("no id found for {}.{}", table.getName(), name);
+                    logger.trace("no id found for {}.{}", keyspace.getName(), name);
                     return null;
                 }
 
@@ -1710,7 +1710,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName);
                     ssTable.createLinks(snapshotDirectory.getPath()); // hard links
                     if (logger.isDebugEnabled())
-                        logger.debug("Snapshot for " + table + " keyspace data file " + ssTable.getFilename() +
+                        logger.debug("Snapshot for " + keyspace + " keyspace data file " + ssTable.getFilename() +
                                      " created in " + snapshotDirectory);
                 }
 
@@ -1811,7 +1811,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public void invalidateCachedRow(DecoratedKey key)
     {
-        UUID cfId = Schema.instance.getId(table.getName(), this.name);
+        UUID cfId = Schema.instance.getId(keyspace.getName(), this.name);
         if (cfId == null)
             return; // secondary index
 
@@ -1825,10 +1825,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public static Iterable<ColumnFamilyStore> all()
     {
-        List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getTables().size());
-        for (Table table : Table.all())
+        List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());
+        for (Keyspace keyspace : Keyspace.all())
         {
-            stores.add(table.getColumnFamilyStores());
+            stores.add(keyspace.getColumnFamilyStores());
         }
         return Iterables.concat(stores);
     }
@@ -1896,7 +1896,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         else
         {
             // just nuke the memtable data w/o writing to disk first
-            Table.switchLock.writeLock().lock();
+            Keyspace.switchLock.writeLock().lock();
             try
             {
                 for (ColumnFamilyStore cfs : concatWithIndexes())
@@ -1910,7 +1910,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             finally
             {
-                Table.switchLock.writeLock().unlock();
+                Keyspace.switchLock.writeLock().unlock();
             }
         }
 
@@ -1922,14 +1922,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 final long truncatedAt = System.currentTimeMillis();
                 if (DatabaseDescriptor.isAutoSnapshot())
-                    snapshot(Table.getTimestampedSnapshotName(name));
+                    snapshot(Keyspace.getTimestampedSnapshotName(name));
 
                 ReplayPosition replayAfter = discardSSTables(truncatedAt);
 
                 for (SecondaryIndex index : indexManager.getIndexes())
                     index.truncateBlocking(truncatedAt);
 
-                SystemTable.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
+                SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
 
                 logger.debug("cleaning out row cache");
                 for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
@@ -2055,7 +2055,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public String toString()
     {
         return "CFS(" +
-               "Keyspace='" + table.getName() + '\'' +
+               "Keyspace='" + keyspace.getName() + '\'' +
                ", ColumnFamily='" + name + '\'' +
                ')';
     }
@@ -2309,7 +2309,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long getTruncationTime()
     {
-        Pair<ReplayPosition, Long> truncationRecord = SystemTable.getTruncationRecords().get(metadata.cfId);
+        Pair<ReplayPosition, Long> truncationRecord = SystemKeyspace.getTruncationRecords().get(metadata.cfId);
         return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index e62da1b..d642d08 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -81,12 +81,12 @@ public enum ConsistencyLevel
         return codeIdx[code];
     }
 
-    private int localQuorumFor(Table table, String dc)
+    private int localQuorumFor(Keyspace keyspace, String dc)
     {
-        return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+        return (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
     }
 
-    public int blockFor(Table table)
+    public int blockFor(Keyspace keyspace)
     {
         switch (this)
         {
@@ -99,16 +99,16 @@ public enum ConsistencyLevel
             case THREE:
                 return 3;
             case QUORUM:
-                return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+                return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
             case ALL:
-                return table.getReplicationStrategy().getReplicationFactor();
+                return keyspace.getReplicationStrategy().getReplicationFactor();
             case LOCAL_QUORUM:
-                return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
+                return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
             case EACH_QUORUM:
-                NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+                NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
                 int n = 0;
                 for (String dc : strategy.getDatacenters())
-                    n += localQuorumFor(table, dc);
+                    n += localQuorumFor(keyspace, dc);
                 return n;
             default:
                 throw new UnsupportedOperationException("Invalid consistency level: " + toString());
@@ -129,9 +129,9 @@ public enum ConsistencyLevel
         return count;
     }
 
-    private Map<String, Integer> countPerDCEndpoints(Table table, Iterable<InetAddress> liveEndpoints)
+    private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddress> liveEndpoints)
     {
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
 
         Map<String, Integer> dcEndpoints = new HashMap<String, Integer>();
         for (String dc: strategy.getDatacenters())
@@ -145,12 +145,12 @@ public enum ConsistencyLevel
         return dcEndpoints;
     }
 
-    public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints)
+    public List<InetAddress> filterForQuery(Keyspace keyspace, List<InetAddress> liveEndpoints)
     {
-        return filterForQuery(table, liveEndpoints, ReadRepairDecision.NONE);
+        return filterForQuery(keyspace, liveEndpoints, ReadRepairDecision.NONE);
     }
 
-    public List<InetAddress> filterForQuery(Table table, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
+    public List<InetAddress> filterForQuery(Keyspace keyspace, List<InetAddress> liveEndpoints, ReadRepairDecision readRepair)
     {
         /*
          * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
@@ -164,7 +164,7 @@ public enum ConsistencyLevel
         switch (readRepair)
         {
             case NONE:
-                return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(table)));
+                return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace)));
             case GLOBAL:
                 return liveEndpoints;
             case DC_LOCAL:
@@ -178,7 +178,7 @@ public enum ConsistencyLevel
                         other.add(add);
                 }
                 // check if blockfor more than we have localep's
-                int blockFor = blockFor(table);
+                int blockFor = blockFor(keyspace);
                 if (local.size() < blockFor)
                     local.addAll(other.subList(0, Math.min(blockFor - local.size(), other.size())));
                 return local;
@@ -187,7 +187,7 @@ public enum ConsistencyLevel
         }
     }
 
-    public boolean isSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints)
+    public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddress> liveEndpoints)
     {
         switch (this)
         {
@@ -195,22 +195,22 @@ public enum ConsistencyLevel
                 // local hint is acceptable, and local node is always live
                 return true;
             case LOCAL_QUORUM:
-                return countLocalEndpoints(liveEndpoints) >= blockFor(table);
+                return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
             case EACH_QUORUM:
-                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
                 {
-                    if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+                    if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
                         return false;
                 }
                 return true;
             default:
-                return Iterables.size(liveEndpoints) >= blockFor(table);
+                return Iterables.size(liveEndpoints) >= blockFor(keyspace);
         }
     }
 
-    public void assureSufficientLiveNodes(Table table, Iterable<InetAddress> liveEndpoints) throws UnavailableException
+    public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddress> liveEndpoints) throws UnavailableException
     {
-        int blockFor = blockFor(table);
+        int blockFor = blockFor(keyspace);
         switch (this)
         {
             case ANY:
@@ -235,9 +235,9 @@ public enum ConsistencyLevel
                 }
                 break;
             case EACH_QUORUM:
-                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+                for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
                 {
-                    int dcBlockFor = localQuorumFor(table, entry.getKey());
+                    int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
                     int dcLive = entry.getValue();
                     if (dcLive < dcBlockFor)
                         throw new UnavailableException(this, dcBlockFor, dcLive);
@@ -254,12 +254,12 @@ public enum ConsistencyLevel
         }
     }
 
-    public void validateForRead(String table) throws InvalidRequestException
+    public void validateForRead(String keyspaceName) throws InvalidRequestException
     {
         switch (this)
         {
             case LOCAL_QUORUM:
-                requireNetworkTopologyStrategy(table);
+                requireNetworkTopologyStrategy(keyspaceName);
                 break;
             case ANY:
                 throw new InvalidRequestException("ANY ConsistencyLevel is only supported for writes");
@@ -268,26 +268,26 @@ public enum ConsistencyLevel
         }
     }
 
-    public void validateForWrite(String table) throws InvalidRequestException
+    public void validateForWrite(String keyspaceName) throws InvalidRequestException
     {
         switch (this)
         {
             case LOCAL_QUORUM:
             case EACH_QUORUM:
-                requireNetworkTopologyStrategy(table);
+                requireNetworkTopologyStrategy(keyspaceName);
                 break;
             case SERIAL:
                 throw new InvalidRequestException("You must use conditional updates for serializable writes");
         }
     }
 
-    public void validateForCas(String table) throws InvalidRequestException
+    public void validateForCas(String keyspaceName) throws InvalidRequestException
     {
         switch (this)
         {
             case LOCAL_QUORUM:
             case EACH_QUORUM:
-                requireNetworkTopologyStrategy(table);
+                requireNetworkTopologyStrategy(keyspaceName);
                 break;
             case ANY:
                 throw new InvalidRequestException("ANY is not supported with CAS. Use SERIAL if you mean, make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads");
@@ -310,9 +310,9 @@ public enum ConsistencyLevel
         }
     }
 
-    private void requireNetworkTopologyStrategy(String table) throws InvalidRequestException
+    private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException
     {
-        AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+        AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
         if (!(strategy instanceof NetworkTopologyStrategy))
             throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName()));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 9ace314..fb363c2 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -51,9 +51,9 @@ public class CounterMutation implements IMutation
         this.consistency = consistency;
     }
 
-    public String getTable()
+    public String getKeyspaceName()
     {
-        return rowMutation.getTable();
+        return rowMutation.getKeyspaceName();
     }
 
     public Collection<UUID> getColumnFamilyIds()
@@ -89,15 +89,15 @@ public class CounterMutation implements IMutation
         {
             if (!columnFamily.metadata().getReplicateOnWrite())
                 continue;
-            addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, timestamp, readCommands);
+            addReadCommandFromColumnFamily(rowMutation.getKeyspaceName(), rowMutation.key(), columnFamily, timestamp, readCommands);
         }
 
         // create a replication RowMutation
-        RowMutation replicationMutation = new RowMutation(rowMutation.getTable(), rowMutation.key());
+        RowMutation replicationMutation = new RowMutation(rowMutation.getKeyspaceName(), rowMutation.key());
         for (ReadCommand readCommand : readCommands)
         {
-            Table table = Table.open(readCommand.table);
-            Row row = readCommand.getRow(table);
+            Keyspace keyspace = Keyspace.open(readCommand.ksName);
+            Row row = readCommand.getRow(keyspace);
             if (row == null || row.cf == null)
                 continue;
 
@@ -107,11 +107,11 @@ public class CounterMutation implements IMutation
         return replicationMutation;
     }
 
-    private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
+    private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
     {
         SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
         Iterables.addAll(s, columnFamily.getColumnNames());
-        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
+        commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
     }
 
     public MessageOut<CounterMutation> makeMutationMessage()
@@ -130,13 +130,13 @@ public class CounterMutation implements IMutation
     public void apply()
     {
         // transform all CounterUpdateColumn to CounterColumn: accomplished by localCopy
-        RowMutation rm = new RowMutation(rowMutation.getTable(), ByteBufferUtil.clone(rowMutation.key()));
-        Table table = Table.open(rm.getTable());
+        RowMutation rm = new RowMutation(rowMutation.getKeyspaceName(), ByteBufferUtil.clone(rowMutation.key()));
+        Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
 
         for (ColumnFamily cf_ : rowMutation.getColumnFamilies())
         {
             ColumnFamily cf = cf_.cloneMeShallow();
-            ColumnFamilyStore cfs = table.getColumnFamilyStore(cf.id());
+            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
             for (Column column : cf_)
             {
                 cf.addColumn(column.localCopy(cfs), HeapAllocator.instance);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index c5a9c2b..af22b1b 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -344,7 +344,7 @@ public class DataTracker
         {
             if (logger.isDebugEnabled())
                 logger.debug(String.format("adding %s to list of files tracked for %s.%s",
-                            sstable.descriptor, cfstore.table.getName(), cfstore.name));
+                            sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
             long size = sstable.bytesOnDisk();
             StorageMetrics.load.inc(size);
             cfstore.metric.liveDiskSpaceUsed.inc(size);
@@ -359,7 +359,7 @@ public class DataTracker
         {
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files tracked for %s.%s",
-                            sstable.descriptor, cfstore.table.getName(), cfstore.name));
+                            sstable.descriptor, cfstore.keyspace.getName(), cfstore.name));
             long size = sstable.bytesOnDisk();
             StorageMetrics.load.dec(size);
             cfstore.metric.liveDiskSpaceUsed.dec(size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index a7ad353..c4b9f84 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -46,7 +46,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Row
         {
             public void runMayThrow() throws Exception
             {
-                DefsTable.mergeSchema(message.payload);
+                DefsTables.mergeSchema(message.payload);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
deleted file mode 100644
index 6008e75..0000000
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
- * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations
- * and used for schema distribution.
- *
- * SCHEMA_KEYSPACES_CF layout:
- *
- * <key (AsciiType)>
- *   ascii => json_serialized_value
- *   ...
- * </key>
- *
- * Where <key> is a name of keyspace e.g. "ks".
- *
- * SCHEMA_COLUMNFAMILIES_CF layout:
- *
- * <key (AsciiType)>
- *     composite(ascii, ascii) => json_serialized_value
- * </key>
- *
- * Where <key> is a name of keyspace e.g. "ks"., first component of the column name is name of the ColumnFamily, last
- * component is the name of the ColumnFamily attribute.
- *
- * SCHEMA_COLUMNS_CF layout:
- *
- * <key (AsciiType)>
- *     composite(ascii, ascii, ascii) => json_serialized value
- * </key>
- *
- * Where <key> is a name of keyspace e.g. "ks".
- *
- * Column names where made composite to support 3-level nesting which represents following structure:
- * "ColumnFamily name":"column name":"column attribute" => "value"
- *
- * Example of schema (using CLI):
- *
- * schema_keyspaces
- * ----------------
- * RowKey: ks
- *  => (column=durable_writes, value=true, timestamp=1327061028312185000)
- *  => (column=name, value="ks", timestamp=1327061028312185000)
- *  => (column=replication_factor, value=0, timestamp=1327061028312185000)
- *  => (column=strategy_class, value="org.apache.cassandra.locator.NetworkTopologyStrategy", timestamp=1327061028312185000)
- *  => (column=strategy_options, value={"datacenter1":"1"}, timestamp=1327061028312185000)
- *
- * schema_columnfamilies
- * ---------------------
- * RowKey: ks
- *  => (column=cf:bloom_filter_fp_chance, value=0.0, timestamp=1327061105833119000)
- *  => (column=cf:caching, value="NONE", timestamp=1327061105833119000)
- *  => (column=cf:column_type, value="Standard", timestamp=1327061105833119000)
- *  => (column=cf:comment, value="ColumnFamily", timestamp=1327061105833119000)
- *  => (column=cf:default_validation_class, value="org.apache.cassandra.db.marshal.BytesType", timestamp=1327061105833119000)
- *  => (column=cf:gc_grace_seconds, value=864000, timestamp=1327061105833119000)
- *  => (column=cf:id, value=1000, timestamp=1327061105833119000)
- *  => (column=cf:key_alias, value="S0VZ", timestamp=1327061105833119000)
- *  ... part of the output omitted.
- *
- * schema_columns
- * --------------
- * RowKey: ks
- *  => (column=cf:c:index_name, value=null, timestamp=1327061105833119000)
- *  => (column=cf:c:index_options, value=null, timestamp=1327061105833119000)
- *  => (column=cf:c:index_type, value=null, timestamp=1327061105833119000)
- *  => (column=cf:c:name, value="aGVsbG8=", timestamp=1327061105833119000)
- *  => (column=cf:c:validation_class, value="org.apache.cassandra.db.marshal.AsciiType", timestamp=1327061105833119000)
- */
-public class DefsTable
-{
-    private static final Logger logger = LoggerFactory.getLogger(DefsTable.class);
-
-    /* saves keyspace definitions to system schema columnfamilies */
-    public static synchronized void save(Collection<KSMetaData> keyspaces)
-    {
-        long timestamp = System.currentTimeMillis();
-
-        for (KSMetaData ksMetaData : keyspaces)
-            ksMetaData.toSchema(timestamp).apply();
-    }
-
-    /**
-     * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF)
-     *
-     * @return Collection of found keyspace definitions
-     */
-    public static Collection<KSMetaData> loadFromTable()
-    {
-        List<Row> serializedSchema = SystemTable.serializedSchema(SystemTable.SCHEMA_KEYSPACES_CF);
-
-        List<KSMetaData> keyspaces = new ArrayList<KSMetaData>(serializedSchema.size());
-
-        for (Row row : serializedSchema)
-        {
-            if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row))
-                continue;
-
-            keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key)));
-        }
-
-        return keyspaces;
-    }
-
-    public static ByteBuffer searchComposite(String name, boolean start)
-    {
-        assert name != null;
-        ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
-        int length = nameBytes.remaining();
-        byte[] bytes = new byte[2 + length + 1];
-        bytes[0] = (byte)((length >> 8) & 0xFF);
-        bytes[1] = (byte)(length & 0xFF);
-        ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length);
-        bytes[bytes.length - 1] = (byte)(start ? 0 : 1);
-        return ByteBuffer.wrap(bytes);
-    }
-
-    private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
-    {
-        ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
-                                                                                         SystemTable.SCHEMA_COLUMNFAMILIES_CF,
-                                                                                         System.currentTimeMillis())));
-    }
-
-    /**
-     * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
-     * (which also involves fs operations on add/drop ks/cf)
-     *
-     * @param mutations the schema changes to apply
-     *
-     * @throws ConfigurationException If one of metadata attributes has invalid value
-     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
-     */
-    public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
-    {
-        // current state of the schema
-        Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF);
-        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-
-        for (RowMutation mutation : mutations)
-            mutation.apply();
-
-        if (!StorageService.instance.isClientMode())
-            flushSchemaCFs();
-
-        Schema.instance.updateVersionAndAnnounce();
-
-        // with new data applied
-        Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF);
-        Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-
-        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
-        mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
-
-        // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
-        for (String keyspaceToDrop : keyspacesToDrop)
-            dropKeyspace(keyspaceToDrop);
-
-    }
-
-    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
-    {
-        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
-
-        /**
-         * At first step we check if any new keyspaces were added.
-         */
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-        {
-            ColumnFamily ksAttrs = entry.getValue();
-
-            // we don't care about nested ColumnFamilies here because those are going to be processed separately
-            if (!(ksAttrs.getColumnCount() == 0))
-                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList()));
-        }
-
-        /**
-         * At second step we check if there were any keyspaces re-created, in this context
-         * re-created means that they were previously deleted but still exist in the low-level schema as empty keys
-         */
-
-        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
-
-        // instead of looping over all modified entries and skipping processed keys all the time
-        // we would rather store "left to process" items and iterate over them removing already met keys
-        List<DecoratedKey> leftToProcess = new ArrayList<DecoratedKey>(modifiedEntries.size());
-
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : modifiedEntries.entrySet())
-        {
-            ColumnFamily prevValue = entry.getValue().leftValue();
-            ColumnFamily newValue = entry.getValue().rightValue();
-
-            if (prevValue.getColumnCount() == 0)
-            {
-                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList()));
-                continue;
-            }
-
-            leftToProcess.add(entry.getKey());
-        }
-
-        if (leftToProcess.size() == 0)
-            return Collections.emptySet();
-
-        /**
-         * At final step we updating modified keyspaces and saving keyspaces drop them later
-         */
-
-        Set<String> keyspacesToDrop = new HashSet<String>();
-
-        for (DecoratedKey key : leftToProcess)
-        {
-            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(key);
-
-            ColumnFamily newState = valueDiff.rightValue();
-
-            if (newState.getColumnCount() == 0)
-                keyspacesToDrop.add(AsciiType.instance.getString(key.key));
-            else
-                updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList()));
-        }
-
-        return keyspacesToDrop;
-    }
-
-    private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
-    {
-        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
-
-        // check if any new Keyspaces with ColumnFamilies were added.
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-        {
-            ColumnFamily cfAttrs = entry.getValue();
-
-            if (!(cfAttrs.getColumnCount() == 0))
-            {
-               Map<String, CFMetaData> cfDefs = KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), cfAttrs));
-
-                for (CFMetaData cfDef : cfDefs.values())
-                    addColumnFamily(cfDef);
-            }
-        }
-
-        // deal with modified ColumnFamilies (remember that all of the keyspace nested ColumnFamilies are put to the single row)
-        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
-
-        for (DecoratedKey keyspace : modifiedEntries.keySet())
-        {
-            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(keyspace);
-
-            ColumnFamily prevValue = valueDiff.leftValue(); // state before external modification
-            ColumnFamily newValue = valueDiff.rightValue(); // updated state
-
-            Row newRow = new Row(keyspace, newValue);
-
-            if (prevValue.getColumnCount() == 0) // whole keyspace was deleted and now it's re-created
-            {
-                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(newRow).values())
-                    addColumnFamily(cfm);
-            }
-            else if (newValue.getColumnCount() == 0) // whole keyspace is deleted
-            {
-                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(new Row(keyspace, prevValue)).values())
-                    dropColumnFamily(cfm.ksName, cfm.cfName);
-            }
-            else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
-            {
-                String ksName = AsciiType.instance.getString(keyspace.key);
-
-                Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>();
-                for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())
-                    oldCfDefs.put(cfm.cfName, cfm);
-
-                Map<String, CFMetaData> newCfDefs = KSMetaData.deserializeColumnFamilies(newRow);
-
-                MapDifference<String, CFMetaData> cfDefDiff = Maps.difference(oldCfDefs, newCfDefs);
-
-                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnRight().values())
-                    addColumnFamily(cfDef);
-
-                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnLeft().values())
-                    dropColumnFamily(cfDef.ksName, cfDef.cfName);
-
-                for (MapDifference.ValueDifference<CFMetaData> cfDef : cfDefDiff.entriesDiffering().values())
-                    updateColumnFamily(cfDef.rightValue());
-            }
-        }
-    }
-
-    private static void addKeyspace(KSMetaData ksm)
-    {
-        assert Schema.instance.getKSMetaData(ksm.name) == null;
-        Schema.instance.load(ksm);
-
-        if (!StorageService.instance.isClientMode())
-        {
-            Table.open(ksm.name);
-            MigrationManager.instance.notifyCreateKeyspace(ksm);
-        }
-    }
-
-    private static void addColumnFamily(CFMetaData cfm)
-    {
-        assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null;
-        KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
-        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
-
-        logger.info("Loading " + cfm);
-
-        Schema.instance.load(cfm);
-
-        // make sure it's init-ed w/ the old definitions first,
-        // since we're going to call initCf on the new one manually
-        Table.open(cfm.ksName);
-
-        Schema.instance.setTableDefinition(ksm);
-
-        if (!StorageService.instance.isClientMode())
-        {
-            Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
-            MigrationManager.instance.notifyCreateColumnFamily(cfm);
-        }
-    }
-
-    private static void updateKeyspace(KSMetaData newState)
-    {
-        KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
-        assert oldKsm != null;
-        KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values());
-
-        Schema.instance.setTableDefinition(newKsm);
-
-        if (!StorageService.instance.isClientMode())
-        {
-            Table.open(newState.name).createReplicationStrategy(newKsm);
-            MigrationManager.instance.notifyUpdateKeyspace(newKsm);
-        }
-    }
-
-    private static void updateColumnFamily(CFMetaData newState)
-    {
-        CFMetaData cfm = Schema.instance.getCFMetaData(newState.ksName, newState.cfName);
-        assert cfm != null;
-        cfm.reload();
-
-        if (!StorageService.instance.isClientMode())
-        {
-            Table table = Table.open(cfm.ksName);
-            table.getColumnFamilyStore(cfm.cfName).reload();
-            MigrationManager.instance.notifyUpdateColumnFamily(cfm);
-        }
-    }
-
-    private static void dropKeyspace(String ksName)
-    {
-        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
-        String snapshotName = Table.getTimestampedSnapshotName(ksName);
-
-        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
-
-        // remove all cfs from the table instance.
-        for (CFMetaData cfm : ksm.cfMetaData().values())
-        {
-            ColumnFamilyStore cfs = Table.open(ksm.name).getColumnFamilyStore(cfm.cfName);
-
-            Schema.instance.purge(cfm);
-
-            if (!StorageService.instance.isClientMode())
-            {
-                if (DatabaseDescriptor.isAutoSnapshot())
-                    cfs.snapshot(snapshotName);
-                Table.open(ksm.name).dropCf(cfm.cfId);
-            }
-        }
-
-        // remove the table from the static instances.
-        Table.clear(ksm.name);
-        Schema.instance.clearTableDefinition(ksm);
-        if (!StorageService.instance.isClientMode())
-        {
-            MigrationManager.instance.notifyDropKeyspace(ksm);
-        }
-    }
-
-    private static void dropColumnFamily(String ksName, String cfName)
-    {
-        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
-        assert ksm != null;
-        ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName);
-        assert cfs != null;
-
-        // reinitialize the table.
-        CFMetaData cfm = ksm.cfMetaData().get(cfName);
-
-        Schema.instance.purge(cfm);
-        Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm));
-
-        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
-
-        if (!StorageService.instance.isClientMode())
-        {
-            if (DatabaseDescriptor.isAutoSnapshot())
-                cfs.snapshot(Table.getTimestampedSnapshotName(cfs.name));
-            Table.open(ksm.name).dropCf(cfm.cfId);
-            MigrationManager.instance.notifyDropColumnFamily(cfm);
-        }
-    }
-
-    private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
-    {
-        // clone ksm but do not include the new def
-        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
-        newCfs.remove(toExclude);
-        assert newCfs.size() == ksm.cfMetaData().size() - 1;
-        return KSMetaData.cloneWith(ksm, newCfs);
-    }
-
-    private static void flushSchemaCFs()
-    {
-        flushSchemaCF(SystemTable.SCHEMA_KEYSPACES_CF);
-        flushSchemaCF(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-        flushSchemaCF(SystemTable.SCHEMA_COLUMNS_CF);
-    }
-
-    private static void flushSchemaCF(String cfName)
-    {
-        FBUtilities.waitOnFuture(SystemTable.schemaCFS(cfName).forceFlush());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
new file mode 100644
index 0000000..6f35ed8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
+ * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations
+ * and used for schema distribution.
+ *
+ * SCHEMA_KEYSPACES_CF layout:
+ *
+ * <key (AsciiType)>
+ *   ascii => json_serialized_value
+ *   ...
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks".
+ *
+ * SCHEMA_COLUMNFAMILIES_CF layout:
+ *
+ * <key (AsciiType)>
+ *     composite(ascii, ascii) => json_serialized_value
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks"., first component of the column name is name of the ColumnFamily, last
+ * component is the name of the ColumnFamily attribute.
+ *
+ * SCHEMA_COLUMNS_CF layout:
+ *
+ * <key (AsciiType)>
+ *     composite(ascii, ascii, ascii) => json_serialized value
+ * </key>
+ *
+ * Where <key> is a name of keyspace e.g. "ks".
+ *
+ * Column names where made composite to support 3-level nesting which represents following structure:
+ * "ColumnFamily name":"column name":"column attribute" => "value"
+ *
+ * Example of schema (using CLI):
+ *
+ * schema_keyspaces
+ * ----------------
+ * RowKey: ks
+ *  => (column=durable_writes, value=true, timestamp=1327061028312185000)
+ *  => (column=name, value="ks", timestamp=1327061028312185000)
+ *  => (column=replication_factor, value=0, timestamp=1327061028312185000)
+ *  => (column=strategy_class, value="org.apache.cassandra.locator.NetworkTopologyStrategy", timestamp=1327061028312185000)
+ *  => (column=strategy_options, value={"datacenter1":"1"}, timestamp=1327061028312185000)
+ *
+ * schema_columnfamilies
+ * ---------------------
+ * RowKey: ks
+ *  => (column=cf:bloom_filter_fp_chance, value=0.0, timestamp=1327061105833119000)
+ *  => (column=cf:caching, value="NONE", timestamp=1327061105833119000)
+ *  => (column=cf:column_type, value="Standard", timestamp=1327061105833119000)
+ *  => (column=cf:comment, value="ColumnFamily", timestamp=1327061105833119000)
+ *  => (column=cf:default_validation_class, value="org.apache.cassandra.db.marshal.BytesType", timestamp=1327061105833119000)
+ *  => (column=cf:gc_grace_seconds, value=864000, timestamp=1327061105833119000)
+ *  => (column=cf:id, value=1000, timestamp=1327061105833119000)
+ *  => (column=cf:key_alias, value="S0VZ", timestamp=1327061105833119000)
+ *  ... part of the output omitted.
+ *
+ * schema_columns
+ * --------------
+ * RowKey: ks
+ *  => (column=cf:c:index_name, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:index_options, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:index_type, value=null, timestamp=1327061105833119000)
+ *  => (column=cf:c:name, value="aGVsbG8=", timestamp=1327061105833119000)
+ *  => (column=cf:c:validation_class, value="org.apache.cassandra.db.marshal.AsciiType", timestamp=1327061105833119000)
+ */
+public class DefsTables
+{
+    private static final Logger logger = LoggerFactory.getLogger(DefsTables.class);
+
+    /* saves keyspace definitions to system schema columnfamilies */
+    public static synchronized void save(Collection<KSMetaData> keyspaces)
+    {
+        long timestamp = System.currentTimeMillis();
+
+        for (KSMetaData ksMetaData : keyspaces)
+            ksMetaData.toSchema(timestamp).apply();
+    }
+
+    /**
+     * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF)
+     *
+     * @return Collection of found keyspace definitions
+     */
+    public static Collection<KSMetaData> loadFromKeyspace()
+    {
+        List<Row> serializedSchema = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
+
+        List<KSMetaData> keyspaces = new ArrayList<KSMetaData>(serializedSchema.size());
+
+        for (Row row : serializedSchema)
+        {
+            if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row))
+                continue;
+
+            keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key)));
+        }
+
+        return keyspaces;
+    }
+
+    public static ByteBuffer searchComposite(String name, boolean start)
+    {
+        assert name != null;
+        ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
+        int length = nameBytes.remaining();
+        byte[] bytes = new byte[2 + length + 1];
+        bytes[0] = (byte)((length >> 8) & 0xFF);
+        bytes[1] = (byte)(length & 0xFF);
+        ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length);
+        bytes[bytes.length - 1] = (byte)(start ? 0 : 1);
+        return ByteBuffer.wrap(bytes);
+    }
+
+    private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
+    {
+        ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+                                                                                         SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
+                                                                                         System.currentTimeMillis())));
+    }
+
+    /**
+     * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param mutations the schema changes to apply
+     *
+     * @throws ConfigurationException If one of metadata attributes has invalid value
+     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
+     */
+    public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
+    {
+        // current state of the schema
+        Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
+        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+
+        for (RowMutation mutation : mutations)
+            mutation.apply();
+
+        if (!StorageService.instance.isClientMode())
+            flushSchemaCFs();
+
+        Schema.instance.updateVersionAndAnnounce();
+
+        // with new data applied
+        Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
+        Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+
+        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
+        mergeColumnFamilies(oldColumnFamilies, newColumnFamilies);
+
+        // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
+        for (String keyspaceToDrop : keyspacesToDrop)
+            dropKeyspace(keyspaceToDrop);
+
+    }
+
+    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+    {
+        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+        /**
+         * At first step we check if any new keyspaces were added.
+         */
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily ksAttrs = entry.getValue();
+
+            // we don't care about nested ColumnFamilies here because those are going to be processed separately
+            if (!(ksAttrs.getColumnCount() == 0))
+                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList()));
+        }
+
+        /**
+         * At second step we check if there were any keyspaces re-created, in this context
+         * re-created means that they were previously deleted but still exist in the low-level schema as empty keys
+         */
+
+        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
+
+        // instead of looping over all modified entries and skipping processed keys all the time
+        // we would rather store "left to process" items and iterate over them removing already met keys
+        List<DecoratedKey> leftToProcess = new ArrayList<DecoratedKey>(modifiedEntries.size());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : modifiedEntries.entrySet())
+        {
+            ColumnFamily prevValue = entry.getValue().leftValue();
+            ColumnFamily newValue = entry.getValue().rightValue();
+
+            if (prevValue.getColumnCount() == 0)
+            {
+                addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList()));
+                continue;
+            }
+
+            leftToProcess.add(entry.getKey());
+        }
+
+        if (leftToProcess.size() == 0)
+            return Collections.emptySet();
+
+        /**
+         * At final step we updating modified keyspaces and saving keyspaces drop them later
+         */
+
+        Set<String> keyspacesToDrop = new HashSet<String>();
+
+        for (DecoratedKey key : leftToProcess)
+        {
+            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(key);
+
+            ColumnFamily newState = valueDiff.rightValue();
+
+            if (newState.getColumnCount() == 0)
+                keyspacesToDrop.add(AsciiType.instance.getString(key.key));
+            else
+                updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList()));
+        }
+
+        return keyspacesToDrop;
+    }
+
+    private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated)
+    {
+        // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty)
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated);
+
+        // check if any new Keyspaces with ColumnFamilies were added.
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+        {
+            ColumnFamily cfAttrs = entry.getValue();
+
+            if (!(cfAttrs.getColumnCount() == 0))
+            {
+               Map<String, CFMetaData> cfDefs = KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), cfAttrs));
+
+                for (CFMetaData cfDef : cfDefs.values())
+                    addColumnFamily(cfDef);
+            }
+        }
+
+        // deal with modified ColumnFamilies (remember that all of the keyspace nested ColumnFamilies are put to the single row)
+        Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering();
+
+        for (DecoratedKey keyspace : modifiedEntries.keySet())
+        {
+            MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(keyspace);
+
+            ColumnFamily prevValue = valueDiff.leftValue(); // state before external modification
+            ColumnFamily newValue = valueDiff.rightValue(); // updated state
+
+            Row newRow = new Row(keyspace, newValue);
+
+            if (prevValue.getColumnCount() == 0) // whole keyspace was deleted and now it's re-created
+            {
+                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(newRow).values())
+                    addColumnFamily(cfm);
+            }
+            else if (newValue.getColumnCount() == 0) // whole keyspace is deleted
+            {
+                for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(new Row(keyspace, prevValue)).values())
+                    dropColumnFamily(cfm.ksName, cfm.cfName);
+            }
+            else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
+            {
+                String ksName = AsciiType.instance.getString(keyspace.key);
+
+                Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>();
+                for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())
+                    oldCfDefs.put(cfm.cfName, cfm);
+
+                Map<String, CFMetaData> newCfDefs = KSMetaData.deserializeColumnFamilies(newRow);
+
+                MapDifference<String, CFMetaData> cfDefDiff = Maps.difference(oldCfDefs, newCfDefs);
+
+                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnRight().values())
+                    addColumnFamily(cfDef);
+
+                for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnLeft().values())
+                    dropColumnFamily(cfDef.ksName, cfDef.cfName);
+
+                for (MapDifference.ValueDifference<CFMetaData> cfDef : cfDefDiff.entriesDiffering().values())
+                    updateColumnFamily(cfDef.rightValue());
+            }
+        }
+    }
+
+    private static void addKeyspace(KSMetaData ksm)
+    {
+        assert Schema.instance.getKSMetaData(ksm.name) == null;
+        Schema.instance.load(ksm);
+
+        if (!StorageService.instance.isClientMode())
+        {
+            Keyspace.open(ksm.name);
+            MigrationManager.instance.notifyCreateKeyspace(ksm);
+        }
+    }
+
+    private static void addColumnFamily(CFMetaData cfm)
+    {
+        assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null;
+        KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
+        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
+
+        logger.info("Loading " + cfm);
+
+        Schema.instance.load(cfm);
+
+        // make sure it's init-ed w/ the old definitions first,
+        // since we're going to call initCf on the new one manually
+        Keyspace.open(cfm.ksName);
+
+        Schema.instance.setKeyspaceDefinition(ksm);
+
+        if (!StorageService.instance.isClientMode())
+        {
+            Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
+            MigrationManager.instance.notifyCreateColumnFamily(cfm);
+        }
+    }
+
+    private static void updateKeyspace(KSMetaData newState)
+    {
+        KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name);
+        assert oldKsm != null;
+        KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values());
+
+        Schema.instance.setKeyspaceDefinition(newKsm);
+
+        if (!StorageService.instance.isClientMode())
+        {
+            Keyspace.open(newState.name).createReplicationStrategy(newKsm);
+            MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+        }
+    }
+
+    private static void updateColumnFamily(CFMetaData newState)
+    {
+        CFMetaData cfm = Schema.instance.getCFMetaData(newState.ksName, newState.cfName);
+        assert cfm != null;
+        cfm.reload();
+
+        if (!StorageService.instance.isClientMode())
+        {
+            Keyspace keyspace = Keyspace.open(cfm.ksName);
+            keyspace.getColumnFamilyStore(cfm.cfName).reload();
+            MigrationManager.instance.notifyUpdateColumnFamily(cfm);
+        }
+    }
+
+    private static void dropKeyspace(String ksName)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+        String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
+
+        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
+
+        // remove all cfs from the keyspace instance.
+        for (CFMetaData cfm : ksm.cfMetaData().values())
+        {
+            ColumnFamilyStore cfs = Keyspace.open(ksm.name).getColumnFamilyStore(cfm.cfName);
+
+            Schema.instance.purge(cfm);
+
+            if (!StorageService.instance.isClientMode())
+            {
+                if (DatabaseDescriptor.isAutoSnapshot())
+                    cfs.snapshot(snapshotName);
+                Keyspace.open(ksm.name).dropCf(cfm.cfId);
+            }
+        }
+
+        // remove the keyspace from the static instances.
+        Keyspace.clear(ksm.name);
+        Schema.instance.clearKeyspaceDefinition(ksm);
+        if (!StorageService.instance.isClientMode())
+        {
+            MigrationManager.instance.notifyDropKeyspace(ksm);
+        }
+    }
+
+    private static void dropColumnFamily(String ksName, String cfName)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+        assert ksm != null;
+        ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
+        assert cfs != null;
+
+        // reinitialize the keyspace.
+        CFMetaData cfm = ksm.cfMetaData().get(cfName);
+
+        Schema.instance.purge(cfm);
+        Schema.instance.setKeyspaceDefinition(makeNewKeyspaceDefinition(ksm, cfm));
+
+        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+
+        if (!StorageService.instance.isClientMode())
+        {
+            if (DatabaseDescriptor.isAutoSnapshot())
+                cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
+            Keyspace.open(ksm.name).dropCf(cfm.cfId);
+            MigrationManager.instance.notifyDropColumnFamily(cfm);
+        }
+    }
+
+    private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)
+    {
+        // clone ksm but do not include the new def
+        List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+        newCfs.remove(toExclude);
+        assert newCfs.size() == ksm.cfMetaData().size() - 1;
+        return KSMetaData.cloneWith(ksm, newCfs);
+    }
+
+    private static void flushSchemaCFs()
+    {
+        flushSchemaCF(SystemKeyspace.SCHEMA_KEYSPACES_CF);
+        flushSchemaCF(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
+        flushSchemaCF(SystemKeyspace.SCHEMA_COLUMNS_CF);
+    }
+
+    private static void flushSchemaCF(String cfName)
+    {
+        FBUtilities.waitOnFuture(SystemKeyspace.schemaCFS(cfName).forceFlush());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 0dd544e..5a0ac22 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -79,27 +79,27 @@ public class Directories
             dataFileLocations[i] = new DataDirectory(new File(locations[i]));
     }
 
-    private final String tablename;
+    private final String keyspacename;
     private final String cfname;
     private final File[] sstableDirectories;
 
-    public static Directories create(String tablename, String cfname)
+    public static Directories create(String keyspacename, String cfname)
     {
         int idx = cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
         if (idx > 0)
             // secondary index, goes in the same directory than the base cf
-            return new Directories(tablename, cfname, cfname.substring(0, idx));
+            return new Directories(keyspacename, cfname, cfname.substring(0, idx));
         else
-            return new Directories(tablename, cfname, cfname);
+            return new Directories(keyspacename, cfname, cfname);
     }
 
-    private Directories(String tablename, String cfname, String directoryName)
+    private Directories(String keyspacename, String cfname, String directoryName)
     {
-        this.tablename = tablename;
+        this.keyspacename = keyspacename;
         this.cfname = cfname;
         this.sstableDirectories = new File[dataFileLocations.length];
         for (int i = 0; i < dataFileLocations.length; ++i)
-            sstableDirectories[i] = new File(dataFileLocations[i].location, join(tablename, directoryName));
+            sstableDirectories[i] = new File(dataFileLocations[i].location, join(keyspacename, directoryName));
 
         if (!StorageService.instance.isClientMode())
         {
@@ -362,7 +362,7 @@ public class Directories
         private FileFilter getFilter()
         {
             // Note: the prefix needs to include cfname + separator to distinguish between a cfs and it's secondary indexes
-            final String sstablePrefix = tablename + Component.separator + cfname + Component.separator;
+            final String sstablePrefix = keyspacename + Component.separator + cfname + Component.separator;
             return new FileFilter()
             {
                 // This function always return false since accepts adds to the components map

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3a30701..9f21451 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -113,7 +113,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                                                                                  new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
                                                                                  "internal");
 
-    private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
+    private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
 
     /**
      * Returns a mutation representing a Hint to be sent to <code>targetId</code>
@@ -126,9 +126,9 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         // serialize the hint with id and version as a composite column name
         ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS, SystemTable.HINTS_CF));
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
         cf.addColumn(name, value, System.currentTimeMillis(), ttl);
-        return new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
+        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
     }
 
     /*
@@ -171,8 +171,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
     {
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
-        rm.delete(SystemTable.HINTS_CF, columnName, timestamp);
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
+        rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
         rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 
@@ -196,8 +196,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             return;
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
         ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
-        final RowMutation rm = new RowMutation(Table.SYSTEM_KS, hostIdBytes);
-        rm.delete(SystemTable.HINTS_CF, System.currentTimeMillis());
+        final RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, hostIdBytes);
+        rm.delete(SystemKeyspace.HINTS_CF, System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
         Runnable runnable = new Runnable()
@@ -250,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         }
         waited = 0;
         // then wait for the correct schema version.
-        // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system table.
+        // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system keyspace.
         // here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
         // causes the two to diverge (see CASSANDRA-2946)
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
@@ -328,7 +328,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         {
             long now = System.currentTimeMillis();
             QueryFilter filter = QueryFilter.getSliceFilter(epkey,
-                                                            SystemTable.HINTS_CF,
+                                                            SystemKeyspace.HINTS_CF,
                                                             startColumn,
                                                             ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                             false,
@@ -393,7 +393,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     Long truncatedAt = truncationTimesCache.get(cfId);
                     if (truncatedAt == null)
                     {
-                        ColumnFamilyStore cfs = Table.open(rm.getTable()).getColumnFamilyStore(cfId);
+                        ColumnFamilyStore cfs = Keyspace.open(rm.getKeyspaceName()).getColumnFamilyStore(cfId);
                         truncatedAt = cfs.getTruncationTime();
                         truncationTimesCache.put(cfId, truncatedAt);
                     }
@@ -579,8 +579,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         try
         {
-            RangeSliceCommand cmd = new RangeSliceCommand(Table.SYSTEM_KS,
-                                                          SystemTable.HINTS_CF,
+            RangeSliceCommand cmd = new RangeSliceCommand(Keyspace.SYSTEM_KS,
+                                                          SystemKeyspace.HINTS_CF,
                                                           System.currentTimeMillis(),
                                                           predicate,
                                                           range,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index b15b144..70bd79c 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -23,7 +23,7 @@ import java.util.UUID;
 
 public interface IMutation
 {
-    public String getTable();
+    public String getKeyspaceName();
     public Collection<UUID> getColumnFamilyIds();
     public ByteBuffer key();
     public void apply();