You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/27 20:36:45 UTC

[09/11] Rename Table to Keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
new file mode 100644
index 0000000..667a656
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.tracing.Tracing;
+
+/**
+ * It represents a Keyspace.
+ */
+public class Keyspace
+{
+    public static final String SYSTEM_KS = "system";
+    private static final int DEFAULT_PAGE_SIZE = 10000;
+
+    private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
+
+    /**
+     * accesses to CFS.memtable should acquire this for thread safety.
+     * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
+     * <p/>
+     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
+     */
+    public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
+
+    // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
+    // proper directories here as well as in CassandraDaemon.
+    static
+    {
+        if (!StorageService.instance.isClientMode())
+            DatabaseDescriptor.createAllDirectories();
+    }
+
+    public final KSMetaData metadata;
+
+    /* ColumnFamilyStore per column family */
+    private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
+    private volatile AbstractReplicationStrategy replicationStrategy;
+    public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>()
+    {
+        public Keyspace apply(String keyspaceName)
+        {
+            return Keyspace.open(keyspaceName);
+        }
+    };
+
+    public static Keyspace open(String keyspaceName)
+    {
+        return open(keyspaceName, Schema.instance, true);
+    }
+
+    public static Keyspace openWithoutSSTables(String keyspaceName)
+    {
+        return open(keyspaceName, Schema.instance, false);
+    }
+
+    private static Keyspace open(String keyspaceName, Schema schema, boolean loadSSTables)
+    {
+        Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+
+        if (keyspaceInstance == null)
+        {
+            // instantiate the Keyspace.  we could use putIfAbsent but it's important to making sure it is only done once
+            // per keyspace, so we synchronize and re-check before doing it.
+            synchronized (Keyspace.class)
+            {
+                keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
+                if (keyspaceInstance == null)
+                {
+                    // open and store the keyspace
+                    keyspaceInstance = new Keyspace(keyspaceName, loadSSTables);
+                    schema.storeKeyspaceInstance(keyspaceInstance);
+
+                    // keyspace has to be constructed and in the cache before cacheRow can be called
+                    for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
+                        cfs.initRowCache();
+                }
+            }
+        }
+        return keyspaceInstance;
+    }
+
+    public static Keyspace clear(String keyspaceName)
+    {
+        return clear(keyspaceName, Schema.instance);
+    }
+
+    public static Keyspace clear(String keyspaceName, Schema schema)
+    {
+        synchronized (Keyspace.class)
+        {
+            Keyspace t = schema.removeKeyspaceInstance(keyspaceName);
+            if (t != null)
+            {
+                for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
+                    t.unloadCf(cfs);
+            }
+            return t;
+        }
+    }
+
+    /**
+     * Removes every SSTable in the directory from the appropriate DataTracker's view.
+     * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
+     */
+    public static void removeUnreadableSSTables(File directory)
+    {
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores())
+            {
+                for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
+                    cfs.maybeRemoveUnreadableSSTables(directory);
+            }
+        }
+    }
+
+    public Collection<ColumnFamilyStore> getColumnFamilyStores()
+    {
+        return Collections.unmodifiableCollection(columnFamilyStores.values());
+    }
+
+    public ColumnFamilyStore getColumnFamilyStore(String cfName)
+    {
+        UUID id = Schema.instance.getId(getName(), cfName);
+        if (id == null)
+            throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", getName(), cfName));
+        return getColumnFamilyStore(id);
+    }
+
+    public ColumnFamilyStore getColumnFamilyStore(UUID id)
+    {
+        ColumnFamilyStore cfs = columnFamilyStores.get(id);
+        if (cfs == null)
+            throw new IllegalArgumentException("Unknown CF " + id);
+        return cfs;
+    }
+
+    /**
+     * Take a snapshot of the specific column family, or the entire set of column families
+     * if columnFamily is null with a given timestamp
+     *
+     * @param snapshotName     the tag associated with the name of the snapshot.  This value may not be null
+     * @param columnFamilyName the column family to snapshot or all on null
+     * @throws IOException if the column family doesn't exist
+     */
+    public void snapshot(String snapshotName, String columnFamilyName) throws IOException
+    {
+        assert snapshotName != null;
+        boolean tookSnapShot = false;
+        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+        {
+            if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
+            {
+                tookSnapShot = true;
+                cfStore.snapshot(snapshotName);
+            }
+        }
+
+        if ((columnFamilyName != null) && !tookSnapShot)
+            throw new IOException("Failed taking snapshot. Column family " + columnFamilyName + " does not exist.");
+    }
+
+    /**
+     * @param clientSuppliedName may be null.
+     * @return the name of the snapshot
+     */
+    public static String getTimestampedSnapshotName(String clientSuppliedName)
+    {
+        String snapshotName = Long.toString(System.currentTimeMillis());
+        if (clientSuppliedName != null && !clientSuppliedName.equals(""))
+        {
+            snapshotName = snapshotName + "-" + clientSuppliedName;
+        }
+        return snapshotName;
+    }
+
+    /**
+     * Check whether snapshots already exists for a given name.
+     *
+     * @param snapshotName the user supplied snapshot name
+     * @return true if the snapshot exists
+     */
+    public boolean snapshotExists(String snapshotName)
+    {
+        assert snapshotName != null;
+        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+        {
+            if (cfStore.snapshotExists(snapshotName))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * Clear all the snapshots for a given keyspace.
+     *
+     * @param snapshotName the user supplied snapshot name. It empty or null,
+     *                     all the snapshots will be cleaned
+     */
+    public void clearSnapshot(String snapshotName)
+    {
+        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+        {
+            cfStore.clearSnapshot(snapshotName);
+        }
+    }
+
+    /**
+     * @return A list of open SSTableReaders
+     */
+    public List<SSTableReader> getAllSSTables()
+    {
+        List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size());
+        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
+            list.addAll(cfStore.getSSTables());
+        return list;
+    }
+
+    private Keyspace(String keyspaceName, boolean loadSSTables)
+    {
+        metadata = Schema.instance.getKSMetaData(keyspaceName);
+        assert metadata != null : "Unknown keyspace " + keyspaceName;
+        createReplicationStrategy(metadata);
+
+        for (CFMetaData cfm : new ArrayList<CFMetaData>(metadata.cfMetaData().values()))
+        {
+            logger.debug("Initializing {}.{}", getName(), cfm.cfName);
+            initCf(cfm.cfId, cfm.cfName, loadSSTables);
+        }
+    }
+
+    public void createReplicationStrategy(KSMetaData ksm)
+    {
+        if (replicationStrategy != null)
+            StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
+
+        replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
+                                                                                    ksm.strategyClass,
+                                                                                    StorageService.instance.getTokenMetadata(),
+                                                                                    DatabaseDescriptor.getEndpointSnitch(),
+                                                                                    ksm.strategyOptions);
+    }
+
+    // best invoked on the compaction mananger.
+    public void dropCf(UUID cfId)
+    {
+        assert columnFamilyStores.containsKey(cfId);
+        ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
+        if (cfs == null)
+            return;
+
+        unloadCf(cfs);
+    }
+
+    // disassociate a cfs from this keyspace instance.
+    private void unloadCf(ColumnFamilyStore cfs)
+    {
+        cfs.forceBlockingFlush();
+        cfs.invalidate();
+    }
+
+    /**
+     * adds a cf to internal structures, ends up creating disk files).
+     */
+    public void initCf(UUID cfId, String cfName, boolean loadSSTables)
+    {
+        ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+
+        if (cfs == null)
+        {
+            // CFS being created for the first time, either on server startup or new CF being added.
+            // We don't worry about races here; startup is safe, and adding multiple idential CFs
+            // simultaneously is a "don't do that" scenario.
+            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+            // CFS mbean instantiation will error out before we hit this, but in case that changes...
+            if (oldCfs != null)
+                throw new IllegalStateException("added multiple mappings for cf id " + cfId);
+        }
+        else
+        {
+            // re-initializing an existing CF.  This will happen if you cleared the schema
+            // on this node and it's getting repopulated from the rest of the cluster.
+            assert cfs.name.equals(cfName);
+            cfs.metadata.reload();
+            cfs.reload();
+        }
+    }
+
+    public Row getRow(QueryFilter filter)
+    {
+        ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
+        ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
+        return new Row(filter.key, columnFamily);
+    }
+
+    public void apply(RowMutation mutation, boolean writeCommitLog)
+    {
+        apply(mutation, writeCommitLog, true);
+    }
+
+    /**
+     * This method appends a row to the global CommitLog, then updates memtables and indexes.
+     *
+     * @param mutation       the row to write.  Must not be modified after calling apply, since commitlog append
+     *                       may happen concurrently, depending on the CL Executor type.
+     * @param writeCommitLog false to disable commitlog append entirely
+     * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
+     */
+    public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    {
+        // write the mutation to the commitlog and memtables
+        Tracing.trace("Acquiring switchLock read lock");
+        switchLock.readLock().lock();
+        try
+        {
+            if (writeCommitLog)
+            {
+                Tracing.trace("Appending to commitlog");
+                CommitLog.instance.add(mutation);
+            }
+
+            DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+            {
+                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+                if (cfs == null)
+                {
+                    logger.error("Attempting to mutate non-existant column family " + cf.id());
+                    continue;
+                }
+
+                Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
+                cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
+            }
+        }
+        finally
+        {
+            switchLock.readLock().unlock();
+        }
+    }
+
+    public AbstractReplicationStrategy getReplicationStrategy()
+    {
+        return replicationStrategy;
+    }
+
+    /**
+     * @param key row to index
+     * @param cfs ColumnFamily to index row in
+     * @param idxNames columns to index, in comparator order
+     */
+    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
+
+        Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
+        switchLock.readLock().lock();
+        try
+        {
+            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
+            while (pager.hasNext())
+            {
+                ColumnFamily cf = pager.next();
+                ColumnFamily cf2 = cf.cloneMeShallow();
+                for (Column column : cf)
+                {
+                    if (cfs.indexManager.indexes(column.name(), indexes))
+                        cf2.addColumn(column);
+                }
+                cfs.indexManager.indexRow(key.key, cf2);
+            }
+        }
+        finally
+        {
+            switchLock.readLock().unlock();
+        }
+    }
+
+    public List<Future<?>> flush()
+    {
+        List<Future<?>> futures = new ArrayList<Future<?>>(columnFamilyStores.size());
+        for (UUID cfId : columnFamilyStores.keySet())
+            futures.add(columnFamilyStores.get(cfId).forceFlush());
+        return futures;
+    }
+
+    public static Iterable<Keyspace> all()
+    {
+        return Iterables.transform(Schema.instance.getKeyspaces(), keyspaceTransformer);
+    }
+
+    public static Iterable<Keyspace> nonSystem()
+    {
+        return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), keyspaceTransformer);
+    }
+
+    public static Iterable<Keyspace> system()
+    {
+        return Iterables.transform(Schema.systemKeyspaceNames, keyspaceTransformer);
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + "(name='" + getName() + "')";
+    }
+
+    public String getName()
+    {
+        return metadata.name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c31c882..e323b69 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -410,7 +410,7 @@ public class Memtable
                         // and BL data is strictly local, so we don't need to preserve tombstones for repair.
                         // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
                         // See CASSANDRA-4667.
-                        if (cfs.name.equals(SystemTable.BATCHLOG_CF) && cfs.table.getName().equals(Table.SYSTEM_KS) && !(cf.getColumnCount() == 0))
+                        if (cfs.name.equals(SystemKeyspace.BATCHLOG_CF) && cfs.keyspace.getName().equals(Keyspace.SYSTEM_KS) && !(cf.getColumnCount() == 0))
                             continue;
 
                         // Pedantically, you could purge column level tombstones that are past GcGRace when writing to the SSTable.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 8986d5d..e593a98 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -40,7 +40,7 @@ public class MigrationRequestVerbHandler implements IVerbHandler
     {
         logger.debug("Received migration request from {}.", message.from);
 
-        Collection<RowMutation> schema = SystemTable.serializeSchema();
+        Collection<RowMutation> schema = SystemKeyspace.serializeSchema();
 
         MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
                                                                                                schema,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 0e1fa4f..265e9f7 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -104,7 +104,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
     public List<Row> executeLocally()
     {
-        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
         ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, timestamp);
         if (cfs.indexManager.hasIndexFor(rowFilter))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c037518..b4afbcd 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -128,7 +129,7 @@ public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
 
     public List<Row> executeLocally()
     {
-        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
         ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
         if (cfs.indexManager.hasIndexFor(rowFilter))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3031da8..cadcd7d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -63,28 +63,28 @@ public abstract class ReadCommand implements IReadCommand, Pageable
         return new MessageOut<ReadCommand>(MessagingService.Verb.READ, this, serializer);
     }
 
-    public final String table;
+    public final String ksName;
     public final String cfName;
     public final ByteBuffer key;
     public final long timestamp;
     private boolean isDigestQuery = false;
     protected final Type commandType;
 
-    protected ReadCommand(String table, ByteBuffer key, String cfName, long timestamp, Type cmdType)
+    protected ReadCommand(String ksName, ByteBuffer key, String cfName, long timestamp, Type cmdType)
     {
-        this.table = table;
+        this.ksName = ksName;
         this.key = key;
         this.cfName = cfName;
         this.timestamp = timestamp;
         this.commandType = cmdType;
     }
 
-    public static ReadCommand create(String table, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
+    public static ReadCommand create(String ksName, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
     {
         if (filter instanceof SliceQueryFilter)
-            return new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
+            return new SliceFromReadCommand(ksName, key, cfName, timestamp, (SliceQueryFilter)filter);
         else
-            return new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
+            return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, (NamesQueryFilter)filter);
     }
 
     public boolean isDigestQuery()
@@ -104,13 +104,13 @@ public abstract class ReadCommand implements IReadCommand, Pageable
 
     public abstract ReadCommand copy();
 
-    public abstract Row getRow(Table table);
+    public abstract Row getRow(Keyspace keyspace);
 
     public abstract IDiskAtomFilter filter();
 
     public String getKeyspace()
     {
-        return table;
+        return ksName;
     }
 
     // maybeGenerateRetryCommand is used to generate a retry for short reads
@@ -142,11 +142,11 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
         ByteBuffer superColumn = null;
         if (version < MessagingService.VERSION_20)
         {
-            CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+            CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }
@@ -186,11 +186,11 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
         ByteBuffer superColumn = null;
         if (version < MessagingService.VERSION_20)
         {
-            CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+            CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.ksName, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index fcab136..3fe6ec4 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -25,8 +25,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 /*
  * The read response message is sent by the server when reading data
- * this encapsulates the tablename and the row that has been read.
- * The table name is needed so that we can use it to create repairs.
+ * this encapsulates the keyspacename and the row that has been read.
+ * The keyspace name is needed so that we can use it to create repairs.
  */
 public class ReadResponse
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 2cb534a..24e02eb 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -40,8 +40,8 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
         }
 
         ReadCommand command = message.payload;
-        Table table = Table.open(command.table);
-        Row row = command.getRow(table);
+        Keyspace keyspace = Keyspace.open(command.ksName);
+        Row row = command.getRow(keyspace);
 
         MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
                                                                       getResponse(command, row),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index 7ca57a8..fe54917 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -29,16 +29,16 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
     static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
     public final int originalCount;
 
-    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
+    public RetriedSliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
     {
-        super(table, key, cfName, timestamp, filter);
+        super(keyspaceName, key, cfName, timestamp, filter);
         this.originalCount = originalCount;
     }
 
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, timestamp, filter, originalCount);
+        ReadCommand readCommand = new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index d78247b..2d4188e 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -42,30 +42,30 @@ public class RowMutation implements IMutation
 
     // todo this is redundant
     // when we remove it, also restore SerializationsTest.testRowMutationRead to not regenerate new RowMutations each test
-    private final String table;
+    private final String keyspaceName;
 
     private final ByteBuffer key;
     // map of column family id to mutations for that column family.
     private final Map<UUID, ColumnFamily> modifications;
 
-    public RowMutation(String table, ByteBuffer key)
+    public RowMutation(String keyspaceName, ByteBuffer key)
     {
-        this(table, key, new HashMap<UUID, ColumnFamily>());
+        this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
     }
 
-    public RowMutation(String table, ByteBuffer key, ColumnFamily cf)
+    public RowMutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
     {
-        this(table, key, Collections.singletonMap(cf.id(), cf));
+        this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
     }
 
-    public RowMutation(String table, Row row)
+    public RowMutation(String keyspaceName, Row row)
     {
-        this(table, row.key.key, row.cf);
+        this(keyspaceName, row.key.key, row.cf);
     }
 
-    protected RowMutation(String table, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+    protected RowMutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
     {
-        this.table = table;
+        this.keyspaceName = keyspaceName;
         this.key = key;
         this.modifications = modifications;
     }
@@ -75,9 +75,9 @@ public class RowMutation implements IMutation
         this(cf.metadata().ksName, key, cf);
     }
 
-    public String getTable()
+    public String getKeyspaceName()
     {
-        return table;
+        return keyspaceName;
     }
 
     public Collection<UUID> getColumnFamilyIds()
@@ -120,7 +120,7 @@ public class RowMutation implements IMutation
      */
     public ColumnFamily addOrGet(String cfName)
     {
-        CFMetaData cfm = Schema.instance.getCFMetaData(table, cfName);
+        CFMetaData cfm = Schema.instance.getCFMetaData(keyspaceName, cfName);
         ColumnFamily cf = modifications.get(cfm.cfId);
         if (cf == null)
         {
@@ -174,7 +174,7 @@ public class RowMutation implements IMutation
             throw new IllegalArgumentException();
 
         RowMutation rm = (RowMutation)m;
-        if (!table.equals(rm.table) || !key.equals(rm.key))
+        if (!keyspaceName.equals(rm.keyspaceName) || !key.equals(rm.key))
             throw new IllegalArgumentException();
 
         for (Map.Entry<UUID, ColumnFamily> entry : rm.modifications.entrySet())
@@ -189,17 +189,17 @@ public class RowMutation implements IMutation
 
     /*
      * This is equivalent to calling commit. Applies the changes to
-     * to the table that is obtained by calling Table.open().
+     * to the keyspace that is obtained by calling Keyspace.open().
      */
     public void apply()
     {
-        Table ks = Table.open(table);
+        Keyspace ks = Keyspace.open(keyspaceName);
         ks.apply(this, ks.metadata.durableWrites);
     }
 
     public void applyUnsafe()
     {
-        Table.open(table).apply(this, false);
+        Keyspace.open(keyspaceName).apply(this, false);
     }
 
     public MessageOut<RowMutation> createMessage()
@@ -220,7 +220,7 @@ public class RowMutation implements IMutation
     public String toString(boolean shallow)
     {
         StringBuilder buff = new StringBuilder("RowMutation(");
-        buff.append("keyspace='").append(table).append('\'');
+        buff.append("keyspace='").append(keyspaceName).append('\'');
         buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
         buff.append(", modifications=[");
         if (shallow)
@@ -240,7 +240,7 @@ public class RowMutation implements IMutation
 
     public RowMutation without(UUID cfId)
     {
-        RowMutation rm = new RowMutation(table, key);
+        RowMutation rm = new RowMutation(keyspaceName, key);
         for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
             if (!entry.getKey().equals(cfId))
                 rm.add(entry.getValue());
@@ -252,7 +252,7 @@ public class RowMutation implements IMutation
         public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
         {
             if (version < MessagingService.VERSION_20)
-                out.writeUTF(rm.getTable());
+                out.writeUTF(rm.getKeyspaceName());
 
             ByteBufferUtil.writeWithShortLength(rm.key(), out);
 
@@ -266,9 +266,9 @@ public class RowMutation implements IMutation
 
         public RowMutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
         {
-            String table = null; // will always be set from cf.metadata but javac isn't smart enough to see that
+            String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
             if (version < MessagingService.VERSION_20)
-                table = in.readUTF();
+                keyspaceName = in.readUTF();
 
             ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
             int size = in.readInt();
@@ -279,7 +279,7 @@ public class RowMutation implements IMutation
             {
                 ColumnFamily cf = deserializeOneCf(in, version, flag);
                 modifications = Collections.singletonMap(cf.id(), cf);
-                table = cf.metadata().ksName;
+                keyspaceName = cf.metadata().ksName;
             }
             else
             {
@@ -288,11 +288,11 @@ public class RowMutation implements IMutation
                 {
                     ColumnFamily cf = deserializeOneCf(in, version, flag);
                     modifications.put(cf.id(), cf);
-                    table = cf.metadata().ksName;
+                    keyspaceName = cf.metadata().ksName;
                 }
             }
 
-            return new RowMutation(table, key, modifications);
+            return new RowMutation(keyspaceName, key, modifications);
         }
 
         private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
@@ -314,7 +314,7 @@ public class RowMutation implements IMutation
             int size = 0;
 
             if (version < MessagingService.VERSION_20)
-                size += sizes.sizeof(rm.getTable());
+                size += sizes.sizeof(rm.getKeyspaceName());
 
             int keySize = rm.key().remaining();
             size += sizes.sizeof((short) keySize) + keySize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 2942249..ae3db78 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -36,30 +36,30 @@ public class SliceByNamesReadCommand extends ReadCommand
 
     public final NamesQueryFilter filter;
 
-    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
+    public SliceByNamesReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
     {
-        super(table, key, cfName, timestamp, Type.GET_BY_NAMES);
+        super(keyspaceName, key, cfName, timestamp, Type.GET_BY_NAMES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
+        ReadCommand readCommand= new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
 
-    public Row getRow(Table table)
+    public Row getRow(Keyspace keyspace)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
+        return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
     public String toString()
     {
         return "SliceByNamesReadCommand(" +
-               "table='" + table + '\'' +
+               "keyspace='" + ksName + '\'' +
                ", key=" + ByteBufferUtil.bytesToHex(key) +
                ", cfName='" + cfName + '\'' +
                ", timestamp='" + timestamp + '\'' +
@@ -84,7 +84,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
     {
         SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
         out.writeBoolean(command.isDigestQuery());
-        out.writeUTF(command.table);
+        out.writeUTF(command.ksName);
         ByteBufferUtil.writeWithShortLength(command.key, out);
 
         if (version < MessagingService.VERSION_20)
@@ -101,7 +101,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
     public ReadCommand deserialize(DataInput in, int version) throws IOException
     {
         boolean isDigest = in.readBoolean();
-        String table = in.readUTF();
+        String keyspaceName = in.readUTF();
         ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
 
         String cfName;
@@ -119,7 +119,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
         long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
 
-        CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         ReadCommand command;
         if (version < MessagingService.VERSION_20)
         {
@@ -141,14 +141,14 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
             // Due to SC compat, it's possible we get back a slice filter at this point
             if (filter instanceof NamesQueryFilter)
-                command = new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
+                command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, (NamesQueryFilter)filter);
             else
-                command = new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
+                command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, (SliceQueryFilter)filter);
         }
         else
         {
             NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
-            command = new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
+            command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
         }
 
         command.setDigestQuery(isDigest);
@@ -167,7 +167,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         int size = sizes.sizeof(command.isDigestQuery());
         int keySize = command.key.remaining();
 
-        size += sizes.sizeof(command.table);
+        size += sizes.sizeof(command.ksName);
         size += sizes.sizeof((short)keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 508d1d2..7526796 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -46,23 +46,23 @@ public class SliceFromReadCommand extends ReadCommand
 
     public final SliceQueryFilter filter;
 
-    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
+    public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
     {
-        super(table, key, cfName, timestamp, Type.GET_SLICES);
+        super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
+        ReadCommand readCommand = new SliceFromReadCommand(ksName, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
 
-    public Row getRow(Table table)
+    public Row getRow(Keyspace keyspace)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
+        return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
-            return new RetriedSliceFromReadCommand(table, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
+            return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
         }
 
         return null;
@@ -108,7 +108,7 @@ public class SliceFromReadCommand extends ReadCommand
 
     public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
     {
-        return new SliceFromReadCommand(table, key, cfName, timestamp, newFilter);
+        return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter);
     }
 
     /**
@@ -125,7 +125,7 @@ public class SliceFromReadCommand extends ReadCommand
     public String toString()
     {
         return "SliceFromReadCommand(" +
-               "table='" + table + '\'' +
+               "keyspace='" + ksName + '\'' +
                ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
                ", cfName='" + cfName + '\'' +
                ", timestamp='" + timestamp + '\'' +
@@ -145,7 +145,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
     {
         SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
         out.writeBoolean(realRM.isDigestQuery());
-        out.writeUTF(realRM.table);
+        out.writeUTF(realRM.ksName);
         ByteBufferUtil.writeWithShortLength(realRM.key, out);
 
         if (version < MessagingService.VERSION_20)
@@ -162,7 +162,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
     public ReadCommand deserialize(DataInput in, int version) throws IOException
     {
         boolean isDigest = in.readBoolean();
-        String table = in.readUTF();
+        String keyspaceName = in.readUTF();
         ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
 
         String cfName;
@@ -180,7 +180,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
 
         long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
 
-        CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         SliceQueryFilter filter;
         if (version < MessagingService.VERSION_20)
         {
@@ -194,7 +194,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
             filter = SliceQueryFilter.serializer.deserialize(in, version);
         }
 
-        ReadCommand command = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
+        ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
         command.setDigestQuery(isDigest);
         return command;
     }
@@ -211,7 +211,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         int keySize = command.key.remaining();
 
         int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
-        size += sizes.sizeof(command.table);
+        size += sizes.sizeof(command.ksName);
         size += sizes.sizeof((short) keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
new file mode 100644
index 0000000..d518468
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -0,0 +1,814 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Function;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
+import org.apache.cassandra.thrift.cassandraConstants;
+import org.apache.cassandra.utils.*;
+
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
+public class SystemKeyspace
+{
+    private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class);
+
+    // see CFMetaData for schema definitions
+    public static final String PEERS_CF = "peers";
+    public static final String PEER_EVENTS_CF = "peer_events";
+    public static final String LOCAL_CF = "local";
+    public static final String INDEX_CF = "IndexInfo";
+    public static final String COUNTER_ID_CF = "NodeIdInfo";
+    public static final String HINTS_CF = "hints";
+    public static final String RANGE_XFERS_CF = "range_xfers";
+    public static final String BATCHLOG_CF = "batchlog";
+    // see layout description in the DefsTables class header
+    public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
+    public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
+    public static final String SCHEMA_COLUMNS_CF = "schema_columns";
+    public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
+    public static final String COMPACTION_LOG = "compactions_in_progress";
+    public static final String PAXOS_CF = "paxos";
+
+    private static final String LOCAL_KEY = "local";
+    private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
+
+    public enum BootstrapState
+    {
+        NEEDS_BOOTSTRAP,
+        COMPLETED,
+        IN_PROGRESS
+    }
+
+    private static DecoratedKey decorate(ByteBuffer key)
+    {
+        return StorageService.getPartitioner().decorateKey(key);
+    }
+
+    public static void finishStartup()
+    {
+        setupVersion();
+
+        // add entries to system schema columnfamilies for the hardcoded system definitions
+        for (String ksname : Schema.systemKeyspaceNames)
+        {
+            KSMetaData ksmd = Schema.instance.getKSMetaData(ksname);
+
+            // delete old, possibly obsolete entries in schema columnfamilies
+            for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, SystemKeyspace.SCHEMA_COLUMNS_CF))
+            {
+                String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
+                processInternal(req);
+            }
+
+            // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
+            ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
+        }
+    }
+
+    private static void setupVersion()
+    {
+        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')";
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        processInternal(String.format(req, LOCAL_CF,
+                                         LOCAL_KEY,
+                                         FBUtilities.getReleaseVersionString(),
+                                         QueryProcessor.CQL_VERSION.toString(),
+                                         cassandraConstants.VERSION,
+                                         snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
+                                         snitch.getRack(FBUtilities.getBroadcastAddress()),
+                                         DatabaseDescriptor.getPartitioner().getClass().getName()));
+    }
+
+    /**
+     * Write compaction log, except columfamilies under system keyspace.
+     *
+     * @param cfs
+     * @param toCompact sstables to compact
+     * @return compaction task id or null if cfs is under system keyspace
+     */
+    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
+    {
+        if (Keyspace.SYSTEM_KS.equals(cfs.keyspace.getName()))
+            return null;
+
+        UUID compactionId = UUIDGen.getTimeUUID();
+        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
+        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
+        {
+            public Integer apply(SSTableReader sstable)
+            {
+                return sstable.descriptor.generation;
+            }
+        });
+        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.keyspace.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
+        forceBlockingFlush(COMPACTION_LOG);
+        return compactionId;
+    }
+
+    public static void finishCompaction(UUID taskId)
+    {
+        assert taskId != null;
+
+        String req = "DELETE FROM system.%s WHERE id = %s";
+        processInternal(String.format(req, COMPACTION_LOG, taskId));
+        forceBlockingFlush(COMPACTION_LOG);
+    }
+
+    /**
+     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+     */
+    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+    {
+        String req = "SELECT * FROM system.%s";
+        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
+
+        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+        for (UntypedResultSet.Row row : resultSet)
+        {
+            String keyspace = row.getString("keyspace_name");
+            String columnfamily = row.getString("columnfamily_name");
+            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+
+            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+        }
+        return unfinishedCompactions;
+    }
+
+    public static void discardCompactionsInProgress()
+    {
+        ColumnFamilyStore compactionLog = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
+        compactionLog.truncateBlocking();
+    }
+
+    public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+    {
+        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
+        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    /**
+     * This method is used to remove information about truncation time for specified column family
+     */
+    public static void removeTruncationRecord(UUID cfId)
+    {
+        String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
+        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        try
+        {
+            ReplayPosition.serializer.serialize(position, out);
+            out.writeLong(truncatedAt);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return String.format("{%s: 0x%s}",
+                             cfs.metadata.cfId,
+                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
+    }
+
+    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
+    {
+        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
+        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        if (rows.isEmpty())
+            return Collections.emptyMap();
+
+        UntypedResultSet.Row row = rows.one();
+        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
+        if (rawMap == null)
+            return Collections.emptyMap();
+
+        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
+        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
+            positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
+        return positions;
+    }
+
+    private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
+    {
+        try
+        {
+            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
+            return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Record tokens being used by another node
+     */
+    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
+    {
+        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        {
+            removeEndpoint(ep);
+            return;
+        }
+
+        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
+        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
+        forceBlockingFlush(PEERS_CF);
+    }
+
+    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
+    {
+        if (ep.equals(FBUtilities.getBroadcastAddress()))
+            return;
+
+        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
+        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
+    }
+
+    public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
+    {
+        // with 30 day TTL
+        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
+        processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
+    }
+
+    public static synchronized void updateSchemaVersion(UUID version)
+    {
+        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
+    }
+
+    private static String tokensAsSet(Collection<Token> tokens)
+    {
+        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+        StringBuilder sb = new StringBuilder();
+        sb.append("{");
+        Iterator<Token> iter = tokens.iterator();
+        while (iter.hasNext())
+        {
+            sb.append("'").append(factory.toString(iter.next())).append("'");
+            if (iter.hasNext())
+                sb.append(",");
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
+    {
+        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+        List<Token> tokens = new ArrayList<Token>(tokensStrings.size());
+        for (String tk : tokensStrings)
+            tokens.add(factory.fromString(tk));
+        return tokens;
+    }
+
+    /**
+     * Remove stored tokens being used by another node
+     */
+    public static synchronized void removeEndpoint(InetAddress ep)
+    {
+        String req = "DELETE FROM system.%s WHERE peer = '%s'";
+        processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+        forceBlockingFlush(PEERS_CF);
+    }
+
+    /**
+     * This method is used to update the System Keyspace with the new tokens for this node
+    */
+    public static synchronized void updateTokens(Collection<Token> tokens)
+    {
+        assert !tokens.isEmpty() : "removeEndpoint should be used instead";
+        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    /**
+     * Convenience method to update the list of tokens in the local system keyspace.
+     *
+     * @param addTokens tokens to add
+     * @param rmTokens tokens to remove
+     * @return the collection of persisted tokens
+     */
+    public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
+    {
+        Collection<Token> tokens = getSavedTokens();
+        tokens.removeAll(rmTokens);
+        tokens.addAll(addTokens);
+        updateTokens(tokens);
+        return tokens;
+    }
+
+    private static void forceBlockingFlush(String cfname)
+    {
+        Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush();
+    }
+
+    /**
+     * Return a map of stored tokens to IP addresses
+     *
+     */
+    public static SetMultimap<InetAddress, Token> loadTokens()
+    {
+        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
+        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
+        {
+            InetAddress peer = row.getInetAddress("peer");
+            if (row.has("tokens"))
+                tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
+        }
+
+        return tokenMap;
+    }
+
+    /**
+     * Return a map of store host_ids to IP addresses
+     *
+     */
+    public static Map<InetAddress, UUID> loadHostIds()
+    {
+        Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
+        for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
+        {
+            InetAddress peer = row.getInetAddress("peer");
+            if (row.has("host_id"))
+            {
+                hostIdMap.put(peer, row.getUUID("host_id"));
+            }
+        }
+        return hostIdMap;
+    }
+
+    /**
+     * Return a map of IP addresses containing a map of dc and rack info
+     */
+    public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
+    {
+        Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
+        for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
+        {
+            InetAddress peer = row.getInetAddress("peer");
+            if (row.has("data_center") && row.has("rack"))
+            {
+                Map<String, String> dcRack = new HashMap<String, String>();
+                dcRack.put("data_center", row.getString("data_center"));
+                dcRack.put("rack", row.getString("rack"));
+                result.put(peer, dcRack);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * One of three things will happen if you try to read the system keyspace:
+     * 1. files are present and you can read them: great
+     * 2. no files are there: great (new node is assumed)
+     * 3. files are present but you can't read them: bad
+     * @throws ConfigurationException
+     */
+    public static void checkHealth() throws ConfigurationException
+    {
+        Keyspace keyspace;
+        try
+        {
+            keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+        }
+        catch (AssertionError err)
+        {
+            // this happens when a user switches from OPP to RP.
+            ConfigurationException ex = new ConfigurationException("Could not read system keyspace!");
+            ex.initCause(err);
+            throw ex;
+        }
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_CF);
+
+        String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
+        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+        if (result.isEmpty() || !result.one().has("cluster_name"))
+        {
+            // this is a brand new node
+            if (!cfs.getSSTables().isEmpty())
+                throw new ConfigurationException("Found system keyspace files, but they couldn't be loaded!");
+
+            // no system files.  this is a new node.
+            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
+            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
+            return;
+        }
+
+        String savedClusterName = result.one().getString("cluster_name");
+        if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
+            throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
+    }
+
+    public static Collection<Token> getSavedTokens()
+    {
+        String req = "SELECT tokens FROM system.%s WHERE key='%s'";
+        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+        return result.isEmpty() || !result.one().has("tokens")
+             ? Collections.<Token>emptyList()
+             : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
+    }
+
+    public static int incrementAndGetGeneration()
+    {
+        String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
+        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+        int generation;
+        if (result.isEmpty() || !result.one().has("gossip_generation"))
+        {
+            // seconds-since-epoch isn't a foolproof new generation
+            // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
+            // but it's as close as sanely possible
+            generation = (int) (System.currentTimeMillis() / 1000);
+        }
+        else
+        {
+            // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
+            final int storedGeneration = result.one().getInt("gossip_generation") + 1;
+            final int now = (int) (System.currentTimeMillis() / 1000);
+            if (storedGeneration >= now)
+            {
+                logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}.  See CASSANDRA-3654 if you experience problems",
+                            storedGeneration, now);
+                generation = storedGeneration;
+            }
+            else
+            {
+                generation = now;
+            }
+        }
+
+        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
+        forceBlockingFlush(LOCAL_CF);
+
+        return generation;
+    }
+
+    public static BootstrapState getBootstrapState()
+    {
+        String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
+        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+        if (result.isEmpty() || !result.one().has("bootstrapped"))
+            return BootstrapState.NEEDS_BOOTSTRAP;
+
+        return BootstrapState.valueOf(result.one().getString("bootstrapped"));
+    }
+
+    public static boolean bootstrapComplete()
+    {
+        return getBootstrapState() == BootstrapState.COMPLETED;
+    }
+
+    public static boolean bootstrapInProgress()
+    {
+        return getBootstrapState() == BootstrapState.IN_PROGRESS;
+    }
+
+    public static void setBootstrapState(BootstrapState state)
+    {
+        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
+        forceBlockingFlush(LOCAL_CF);
+    }
+
+    public static boolean isIndexBuilt(String keyspaceName, String indexName)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
+        QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
+                                                        INDEX_CF,
+                                                        ByteBufferUtil.bytes(indexName),
+                                                        System.currentTimeMillis());
+        return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
+    }
+
+    public static void setIndexBuilt(String keyspaceName, String indexName)
+    {
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
+        cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
+        rm.apply();
+        forceBlockingFlush(INDEX_CF);
+    }
+
+    public static void setIndexRemoved(String keyspaceName, String indexName)
+    {
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
+        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
+        rm.apply();
+        forceBlockingFlush(INDEX_CF);
+    }
+
+    /**
+     * Read the host ID from the system keyspace, creating (and storing) one if
+     * none exists.
+     */
+    public static UUID getLocalHostId()
+    {
+        UUID hostId = null;
+
+        String req = "SELECT host_id FROM system.%s WHERE key='%s'";
+        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
+
+        // Look up the Host UUID (return it if found)
+        if (!result.isEmpty() && result.one().has("host_id"))
+        {
+            return result.one().getUUID("host_id");
+        }
+
+        // ID not found, generate a new one, persist, and then return it.
+        hostId = UUID.randomUUID();
+        logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
+
+        req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
+        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
+        return hostId;
+    }
+
+    /**
+     * Read the current local node id from the system keyspace or null if no
+     * such node id is recorded.
+     */
+    public static CounterId getCurrentLocalCounterId()
+    {
+        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+
+        // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
+        QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
+                                                        COUNTER_ID_CF,
+                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                        true,
+                                                        1,
+                                                        System.currentTimeMillis());
+        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
+        if (cf != null && cf.getColumnCount() != 0)
+            return CounterId.wrap(cf.iterator().next().name());
+        else
+            return null;
+    }
+
+    /**
+     * Write a new current local node id to the system keyspace.
+     *
+     * @param oldCounterId the previous local node id (that {@code newCounterId}
+     * replace) or null if no such node id exists (new node or removed system
+     * keyspace)
+     * @param newCounterId the new current local node id to record
+     * @param now microsecond time stamp.
+     */
+    public static void writeCurrentLocalCounterId(CounterId oldCounterId, CounterId newCounterId, long now)
+    {
+        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
+
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
+        cf.addColumn(new Column(newCounterId.bytes(), ip, now));
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
+        rm.apply();
+        forceBlockingFlush(COUNTER_ID_CF);
+    }
+
+    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
+    {
+        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
+
+        Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS);
+        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
+        ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
+
+        CounterId previous = null;
+        for (Column c : cf)
+        {
+            if (previous != null)
+                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
+
+            // this will ignore the last column on purpose since it is the
+            // current local node id
+            previous = CounterId.wrap(c.name());
+        }
+        return l;
+    }
+
+    /**
+     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+     * @return CFS responsible to hold low-level serialized schema
+     */
+    public static ColumnFamilyStore schemaCFS(String cfName)
+    {
+        return Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(cfName);
+    }
+
+    public static List<Row> serializedSchema()
+    {
+        List<Row> schema = new ArrayList<Row>(3);
+
+        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
+        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
+        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
+
+        return schema;
+    }
+
+    /**
+     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
+     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
+     */
+    public static List<Row> serializedSchema(String schemaCfName)
+    {
+        Token minToken = StorageService.getPartitioner().getMinimumToken();
+
+        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+                                                     null,
+                                                     new IdentityQueryFilter(),
+                                                     Integer.MAX_VALUE,
+                                                     System.currentTimeMillis());
+    }
+
+    public static Collection<RowMutation> serializeSchema()
+    {
+        Map<DecoratedKey, RowMutation> mutationMap = new HashMap<DecoratedKey, RowMutation>();
+
+        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
+        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
+        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
+
+        return mutationMap.values();
+    }
+
+    private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
+    {
+        for (Row schemaRow : serializedSchema(schemaCfName))
+        {
+            if (Schema.ignoredSchemaRow(schemaRow))
+                continue;
+
+            RowMutation mutation = mutationMap.get(schemaRow.key);
+            if (mutation == null)
+            {
+                mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
+                mutationMap.put(schemaRow.key, mutation);
+            }
+
+            mutation.add(schemaRow.cf);
+        }
+    }
+
+    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
+    {
+        Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
+
+        for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
+            schema.put(schemaEntity.key, schemaEntity.cf);
+
+        return schema;
+    }
+
+    public static ByteBuffer getSchemaKSKey(String ksName)
+    {
+        return AsciiType.instance.fromString(ksName);
+    }
+
+    public static Row readSchemaRow(String ksName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+
+        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_KEYSPACES_CF);
+        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
+
+        return new Row(key, result);
+    }
+
+    public static Row readSchemaRow(String ksName, String cfName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
+
+        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
+        ColumnFamily result = schemaCFS.getColumnFamily(key,
+                                                        DefsTables.searchComposite(cfName, true),
+                                                        DefsTables.searchComposite(cfName, false),
+                                                        false,
+                                                        Integer.MAX_VALUE,
+                                                        System.currentTimeMillis());
+
+        return new Row(key, result);
+    }
+
+    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+    {
+        String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
+        UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
+        if (results.isEmpty())
+            return new PaxosState(key, metadata);
+        UntypedResultSet.Row row = results.one();
+        Commit inProgress = new Commit(key,
+                                       row.getUUID("in_progress_ballot"),
+                                       row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata));
+        // either most_recent_commit and most_recent_commit_at will both be set, or neither
+        Commit mostRecent = row.has("most_recent_commit")
+                          ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
+                          : Commit.emptyCommit(key, metadata);
+        return new PaxosState(inProgress, mostRecent);
+    }
+
+    public static void savePaxosPromise(Commit promise)
+    {
+        String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
+        processInternal(String.format(req,
+                                      PAXOS_CF,
+                                      UUIDGen.microsTimestamp(promise.ballot),
+                                      paxosTtl(promise.update.metadata),
+                                      promise.ballot,
+                                      ByteBufferUtil.bytesToHex(promise.key),
+                                      promise.update.id()));
+    }
+
+    public static void savePaxosProposal(Commit commit)
+    {
+        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
+                                      PAXOS_CF,
+                                      UUIDGen.microsTimestamp(commit.ballot),
+                                      paxosTtl(commit.update.metadata),
+                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
+                                      ByteBufferUtil.bytesToHex(commit.key),
+                                      commit.update.id()));
+    }
+
+    private static int paxosTtl(CFMetaData metadata)
+    {
+        // keep paxos state around for at least 3h
+        return Math.max(3 * 3600, metadata.getGcGraceSeconds());
+    }
+
+    public static void savePaxosCommit(Commit commit, boolean eraseInProgressProposal)
+    {
+        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+        // identical except adds proposal = null
+        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+        processInternal(String.format(eraseInProgressProposal ? eraseCql : preserveCql,
+                                      PAXOS_CF,
+                                      UUIDGen.microsTimestamp(commit.ballot),
+                                      paxosTtl(commit.update.metadata),
+                                      commit.ballot,
+                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
+                                      ByteBufferUtil.bytesToHex(commit.key),
+                                      commit.update.id()));
+    }
+}