You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/27 20:36:44 UTC

[08/11] Rename Table to Keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
deleted file mode 100644
index 8528515..0000000
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ /dev/null
@@ -1,814 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.service.paxos.PaxosState;
-import org.apache.cassandra.thrift.cassandraConstants;
-import org.apache.cassandra.utils.*;
-
-import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
-
-public class SystemTable
-{
-    private static final Logger logger = LoggerFactory.getLogger(SystemTable.class);
-
-    // see CFMetaData for schema definitions
-    public static final String PEERS_CF = "peers";
-    public static final String PEER_EVENTS_CF = "peer_events";
-    public static final String LOCAL_CF = "local";
-    public static final String INDEX_CF = "IndexInfo";
-    public static final String COUNTER_ID_CF = "NodeIdInfo";
-    public static final String HINTS_CF = "hints";
-    public static final String RANGE_XFERS_CF = "range_xfers";
-    public static final String BATCHLOG_CF = "batchlog";
-    // see layout description in the DefsTable class header
-    public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
-    public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies";
-    public static final String SCHEMA_COLUMNS_CF = "schema_columns";
-    public static final String SCHEMA_TRIGGERS_CF = "schema_triggers";
-    public static final String COMPACTION_LOG = "compactions_in_progress";
-    public static final String PAXOS_CF = "paxos";
-
-    private static final String LOCAL_KEY = "local";
-    private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
-
-    public enum BootstrapState
-    {
-        NEEDS_BOOTSTRAP,
-        COMPLETED,
-        IN_PROGRESS
-    }
-
-    private static DecoratedKey decorate(ByteBuffer key)
-    {
-        return StorageService.getPartitioner().decorateKey(key);
-    }
-
-    public static void finishStartup()
-    {
-        setupVersion();
-
-        // add entries to system schema columnfamilies for the hardcoded system definitions
-        for (String ksname : Schema.systemKeyspaceNames)
-        {
-            KSMetaData ksmd = Schema.instance.getKSMetaData(ksname);
-
-            // delete old, possibly obsolete entries in schema columnfamilies
-            for (String cfname : Arrays.asList(SystemTable.SCHEMA_KEYSPACES_CF, SystemTable.SCHEMA_COLUMNFAMILIES_CF, SystemTable.SCHEMA_COLUMNS_CF))
-            {
-                String req = String.format("DELETE FROM system.%s WHERE keyspace_name = '%s'", cfname, ksmd.name);
-                processInternal(req);
-            }
-
-            // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
-            ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
-        }
-    }
-
-    private static void setupVersion()
-    {
-        String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, data_center, rack, partitioner) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')";
-        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        processInternal(String.format(req, LOCAL_CF,
-                                         LOCAL_KEY,
-                                         FBUtilities.getReleaseVersionString(),
-                                         QueryProcessor.CQL_VERSION.toString(),
-                                         cassandraConstants.VERSION,
-                                         snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
-                                         snitch.getRack(FBUtilities.getBroadcastAddress()),
-                                         DatabaseDescriptor.getPartitioner().getClass().getName()));
-    }
-
-    /**
-     * Write compaction log, except columfamilies under system keyspace.
-     *
-     * @param cfs
-     * @param toCompact sstables to compact
-     * @return compaction task id or null if cfs is under system keyspace
-     */
-    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
-    {
-        if (Table.SYSTEM_KS.equals(cfs.table.getName()))
-            return null;
-
-        UUID compactionId = UUIDGen.getTimeUUID();
-        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
-        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        });
-        processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ',')));
-        forceBlockingFlush(COMPACTION_LOG);
-        return compactionId;
-    }
-
-    public static void finishCompaction(UUID taskId)
-    {
-        assert taskId != null;
-
-        String req = "DELETE FROM system.%s WHERE id = %s";
-        processInternal(String.format(req, COMPACTION_LOG, taskId));
-        forceBlockingFlush(COMPACTION_LOG);
-    }
-
-    /**
-     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
-     */
-    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
-    {
-        String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
-
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
-        for (UntypedResultSet.Row row : resultSet)
-        {
-            String keyspace = row.getString("keyspace_name");
-            String columnfamily = row.getString("columnfamily_name");
-            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
-
-            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
-        }
-        return unfinishedCompactions;
-    }
-
-    public static void discardCompactionsInProgress()
-    {
-        ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
-        compactionLog.truncateBlocking();
-    }
-
-    public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
-    {
-        String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
-        processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
-        forceBlockingFlush(LOCAL_CF);
-    }
-
-    /**
-     * This method is used to remove information about truncation time for specified column family
-     */
-    public static void removeTruncationRecord(UUID cfId)
-    {
-        String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
-        processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
-        forceBlockingFlush(LOCAL_CF);
-    }
-
-    private static String truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
-    {
-        DataOutputBuffer out = new DataOutputBuffer();
-        try
-        {
-            ReplayPosition.serializer.serialize(position, out);
-            out.writeLong(truncatedAt);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return String.format("{%s: 0x%s}",
-                             cfs.metadata.cfId,
-                             ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
-    }
-
-    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
-    {
-        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
-        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-        if (rows.isEmpty())
-            return Collections.emptyMap();
-
-        UntypedResultSet.Row row = rows.one();
-        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
-        if (rawMap == null)
-            return Collections.emptyMap();
-
-        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
-        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
-            positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
-        return positions;
-    }
-
-    private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
-    {
-        try
-        {
-            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
-            return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Record tokens being used by another node
-     */
-    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
-    {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
-        {
-            removeEndpoint(ep);
-            return;
-        }
-
-        String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)";
-        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), tokensAsSet(tokens)));
-        forceBlockingFlush(PEERS_CF);
-    }
-
-    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value)
-    {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
-            return;
-
-        String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', %s)";
-        processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value));
-    }
-
-    public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
-    {
-        // with 30 day TTL
-        String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ %s ] = %s WHERE peer = '%s'";
-        processInternal(String.format(req, PEER_EVENTS_CF, timePeriod.toString(), value, ep.getHostAddress()));
-    }
-
-    public static synchronized void updateSchemaVersion(UUID version)
-    {
-        String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, version.toString()));
-    }
-
-    private static String tokensAsSet(Collection<Token> tokens)
-    {
-        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
-        StringBuilder sb = new StringBuilder();
-        sb.append("{");
-        Iterator<Token> iter = tokens.iterator();
-        while (iter.hasNext())
-        {
-            sb.append("'").append(factory.toString(iter.next())).append("'");
-            if (iter.hasNext())
-                sb.append(",");
-        }
-        sb.append("}");
-        return sb.toString();
-    }
-
-    private static Collection<Token> deserializeTokens(Collection<String> tokensStrings)
-    {
-        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
-        List<Token> tokens = new ArrayList<Token>(tokensStrings.size());
-        for (String tk : tokensStrings)
-            tokens.add(factory.fromString(tk));
-        return tokens;
-    }
-
-    /**
-     * Remove stored tokens being used by another node
-     */
-    public static synchronized void removeEndpoint(InetAddress ep)
-    {
-        String req = "DELETE FROM system.%s WHERE peer = '%s'";
-        processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
-        forceBlockingFlush(PEERS_CF);
-    }
-
-    /**
-     * This method is used to update the System Table with the new tokens for this node
-    */
-    public static synchronized void updateTokens(Collection<Token> tokens)
-    {
-        assert !tokens.isEmpty() : "removeEndpoint should be used instead";
-        String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokensAsSet(tokens)));
-        forceBlockingFlush(LOCAL_CF);
-    }
-
-    /**
-     * Convenience method to update the list of tokens in the local system table.
-     *
-     * @param addTokens tokens to add
-     * @param rmTokens tokens to remove
-     * @return the collection of persisted tokens
-     */
-    public static synchronized Collection<Token> updateLocalTokens(Collection<Token> addTokens, Collection<Token> rmTokens)
-    {
-        Collection<Token> tokens = getSavedTokens();
-        tokens.removeAll(rmTokens);
-        tokens.addAll(addTokens);
-        updateTokens(tokens);
-        return tokens;
-    }
-
-    private static void forceBlockingFlush(String cfname)
-    {
-        Table.open(Table.SYSTEM_KS).getColumnFamilyStore(cfname).forceBlockingFlush();
-    }
-
-    /**
-     * Return a map of stored tokens to IP addresses
-     *
-     */
-    public static SetMultimap<InetAddress, Token> loadTokens()
-    {
-        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF))
-        {
-            InetAddress peer = row.getInetAddress("peer");
-            if (row.has("tokens"))
-                tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance)));
-        }
-
-        return tokenMap;
-    }
-
-    /**
-     * Return a map of store host_ids to IP addresses
-     *
-     */
-    public static Map<InetAddress, UUID> loadHostIds()
-    {
-        Map<InetAddress, UUID> hostIdMap = new HashMap<InetAddress, UUID>();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, host_id FROM system." + PEERS_CF))
-        {
-            InetAddress peer = row.getInetAddress("peer");
-            if (row.has("host_id"))
-            {
-                hostIdMap.put(peer, row.getUUID("host_id"));
-            }
-        }
-        return hostIdMap;
-    }
-
-    /**
-     * Return a map of IP addresses containing a map of dc and rack info
-     */
-    public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
-    {
-        Map<InetAddress, Map<String, String>> result = new HashMap<InetAddress, Map<String, String>>();
-        for (UntypedResultSet.Row row : processInternal("SELECT peer, data_center, rack from system." + PEERS_CF))
-        {
-            InetAddress peer = row.getInetAddress("peer");
-            if (row.has("data_center") && row.has("rack"))
-            {
-                Map<String, String> dcRack = new HashMap<String, String>();
-                dcRack.put("data_center", row.getString("data_center"));
-                dcRack.put("rack", row.getString("rack"));
-                result.put(peer, dcRack);
-            }
-        }
-        return result;
-    }
-
-    /**
-     * One of three things will happen if you try to read the system table:
-     * 1. files are present and you can read them: great
-     * 2. no files are there: great (new node is assumed)
-     * 3. files are present but you can't read them: bad
-     * @throws ConfigurationException
-     */
-    public static void checkHealth() throws ConfigurationException
-    {
-        Table table;
-        try
-        {
-            table = Table.open(Table.SYSTEM_KS);
-        }
-        catch (AssertionError err)
-        {
-            // this happens when a user switches from OPP to RP.
-            ConfigurationException ex = new ConfigurationException("Could not read system table!");
-            ex.initCause(err);
-            throw ex;
-        }
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(LOCAL_CF);
-
-        String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-
-        if (result.isEmpty() || !result.one().has("cluster_name"))
-        {
-            // this is a brand new node
-            if (!cfs.getSSTables().isEmpty())
-                throw new ConfigurationException("Found system table files, but they couldn't be loaded!");
-
-            // no system files.  this is a new node.
-            req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', '%s')";
-            processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, DatabaseDescriptor.getClusterName()));
-            return;
-        }
-
-        String savedClusterName = result.one().getString("cluster_name");
-        if (!DatabaseDescriptor.getClusterName().equals(savedClusterName))
-            throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
-    }
-
-    public static Collection<Token> getSavedTokens()
-    {
-        String req = "SELECT tokens FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-        return result.isEmpty() || !result.one().has("tokens")
-             ? Collections.<Token>emptyList()
-             : deserializeTokens(result.one().<String>getSet("tokens", UTF8Type.instance));
-    }
-
-    public static int incrementAndGetGeneration()
-    {
-        String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-
-        int generation;
-        if (result.isEmpty() || !result.one().has("gossip_generation"))
-        {
-            // seconds-since-epoch isn't a foolproof new generation
-            // (where foolproof is "guaranteed to be larger than the last one seen at this ip address"),
-            // but it's as close as sanely possible
-            generation = (int) (System.currentTimeMillis() / 1000);
-        }
-        else
-        {
-            // Other nodes will ignore gossip messages about a node that have a lower generation than previously seen.
-            final int storedGeneration = result.one().getInt("gossip_generation") + 1;
-            final int now = (int) (System.currentTimeMillis() / 1000);
-            if (storedGeneration >= now)
-            {
-                logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}.  See CASSANDRA-3654 if you experience problems",
-                            storedGeneration, now);
-                generation = storedGeneration;
-            }
-            else
-            {
-                generation = now;
-            }
-        }
-
-        req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', %d)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, generation));
-        forceBlockingFlush(LOCAL_CF);
-
-        return generation;
-    }
-
-    public static BootstrapState getBootstrapState()
-    {
-        String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-
-        if (result.isEmpty() || !result.one().has("bootstrapped"))
-            return BootstrapState.NEEDS_BOOTSTRAP;
-
-        return BootstrapState.valueOf(result.one().getString("bootstrapped"));
-    }
-
-    public static boolean bootstrapComplete()
-    {
-        return getBootstrapState() == BootstrapState.COMPLETED;
-    }
-
-    public static boolean bootstrapInProgress()
-    {
-        return getBootstrapState() == BootstrapState.IN_PROGRESS;
-    }
-
-    public static void setBootstrapState(BootstrapState state)
-    {
-        String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', '%s')";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, state.name()));
-        forceBlockingFlush(LOCAL_CF);
-    }
-
-    public static boolean isIndexBuilt(String table, String indexName)
-    {
-        ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
-        QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
-                                                        INDEX_CF,
-                                                        ByteBufferUtil.bytes(indexName),
-                                                        System.currentTimeMillis());
-        return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
-    }
-
-    public static void setIndexBuilt(String table, String indexName)
-    {
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Table.SYSTEM_KS, INDEX_CF);
-        cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, ByteBufferUtil.bytes(table), cf);
-        rm.apply();
-        forceBlockingFlush(INDEX_CF);
-    }
-
-    public static void setIndexRemoved(String table, String indexName)
-    {
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, ByteBufferUtil.bytes(table));
-        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
-        rm.apply();
-        forceBlockingFlush(INDEX_CF);
-    }
-
-    /**
-     * Read the host ID from the system table, creating (and storing) one if
-     * none exists.
-     */
-    public static UUID getLocalHostId()
-    {
-        UUID hostId = null;
-
-        String req = "SELECT host_id FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-
-        // Look up the Host UUID (return it if found)
-        if (!result.isEmpty() && result.one().has("host_id"))
-        {
-            return result.one().getUUID("host_id");
-        }
-
-        // ID not found, generate a new one, persist, and then return it.
-        hostId = UUID.randomUUID();
-        logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
-
-        req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
-        processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
-        return hostId;
-    }
-
-    /**
-     * Read the current local node id from the system table or null if no
-     * such node id is recorded.
-     */
-    public static CounterId getCurrentLocalCounterId()
-    {
-        Table table = Table.open(Table.SYSTEM_KS);
-
-        // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
-        QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
-                                                        COUNTER_ID_CF,
-                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                        true,
-                                                        1,
-                                                        System.currentTimeMillis());
-        ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
-        if (cf != null && cf.getColumnCount() != 0)
-            return CounterId.wrap(cf.iterator().next().name());
-        else
-            return null;
-    }
-
-    /**
-     * Write a new current local node id to the system table.
-     *
-     * @param oldCounterId the previous local node id (that {@code newCounterId}
-     * replace) or null if no such node id exists (new node or removed system
-     * table)
-     * @param newCounterId the new current local node id to record
-     * @param now microsecond time stamp.
-     */
-    public static void writeCurrentLocalCounterId(CounterId oldCounterId, CounterId newCounterId, long now)
-    {
-        ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
-
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Table.SYSTEM_KS, COUNTER_ID_CF);
-        cf.addColumn(new Column(newCounterId.bytes(), ip, now));
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
-        rm.apply();
-        forceBlockingFlush(COUNTER_ID_CF);
-    }
-
-    public static List<CounterId.CounterIdRecord> getOldLocalCounterIds()
-    {
-        List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
-
-        Table table = Table.open(Table.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
-        ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
-
-        CounterId previous = null;
-        for (Column c : cf)
-        {
-            if (previous != null)
-                l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
-
-            // this will ignore the last column on purpose since it is the
-            // current local node id
-            previous = CounterId.wrap(c.name());
-        }
-        return l;
-    }
-
-    /**
-     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
-     * @return CFS responsible to hold low-level serialized schema
-     */
-    public static ColumnFamilyStore schemaCFS(String cfName)
-    {
-        return Table.open(Table.SYSTEM_KS).getColumnFamilyStore(cfName);
-    }
-
-    public static List<Row> serializedSchema()
-    {
-        List<Row> schema = new ArrayList<Row>(3);
-
-        schema.addAll(serializedSchema(SCHEMA_KEYSPACES_CF));
-        schema.addAll(serializedSchema(SCHEMA_COLUMNFAMILIES_CF));
-        schema.addAll(serializedSchema(SCHEMA_COLUMNS_CF));
-
-        return schema;
-    }
-
-    /**
-     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
-     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
-     */
-    public static List<Row> serializedSchema(String schemaCfName)
-    {
-        Token minToken = StorageService.getPartitioner().getMinimumToken();
-
-        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
-                                                     null,
-                                                     new IdentityQueryFilter(),
-                                                     Integer.MAX_VALUE,
-                                                     System.currentTimeMillis());
-    }
-
-    public static Collection<RowMutation> serializeSchema()
-    {
-        Map<DecoratedKey, RowMutation> mutationMap = new HashMap<DecoratedKey, RowMutation>();
-
-        serializeSchema(mutationMap, SCHEMA_KEYSPACES_CF);
-        serializeSchema(mutationMap, SCHEMA_COLUMNFAMILIES_CF);
-        serializeSchema(mutationMap, SCHEMA_COLUMNS_CF);
-
-        return mutationMap.values();
-    }
-
-    private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
-    {
-        for (Row schemaRow : serializedSchema(schemaCfName))
-        {
-            if (Schema.ignoredSchemaRow(schemaRow))
-                continue;
-
-            RowMutation mutation = mutationMap.get(schemaRow.key);
-            if (mutation == null)
-            {
-                mutation = new RowMutation(Table.SYSTEM_KS, schemaRow.key.key);
-                mutationMap.put(schemaRow.key, mutation);
-            }
-
-            mutation.add(schemaRow.cf);
-        }
-    }
-
-    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
-    {
-        Map<DecoratedKey, ColumnFamily> schema = new HashMap<DecoratedKey, ColumnFamily>();
-
-        for (Row schemaEntity : SystemTable.serializedSchema(cfName))
-            schema.put(schemaEntity.key, schemaEntity.cf);
-
-        return schema;
-    }
-
-    public static ByteBuffer getSchemaKSKey(String ksName)
-    {
-        return AsciiType.instance.fromString(ksName);
-    }
-
-    public static Row readSchemaRow(String ksName)
-    {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
-
-        ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
-        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
-
-        return new Row(key, result);
-    }
-
-    public static Row readSchemaRow(String ksName, String cfName)
-    {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
-
-        ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
-        ColumnFamily result = schemaCFS.getColumnFamily(key,
-                                                        DefsTable.searchComposite(cfName, true),
-                                                        DefsTable.searchComposite(cfName, false),
-                                                        false,
-                                                        Integer.MAX_VALUE,
-                                                        System.currentTimeMillis());
-
-        return new Row(key, result);
-    }
-
-    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
-    {
-        String req = "SELECT * FROM system.%s WHERE row_key = 0x%s AND cf_id = %s";
-        UntypedResultSet results = processInternal(String.format(req, PAXOS_CF, ByteBufferUtil.bytesToHex(key), metadata.cfId));
-        if (results.isEmpty())
-            return new PaxosState(key, metadata);
-        UntypedResultSet.Row row = results.one();
-        Commit inProgress = new Commit(key,
-                                       row.getUUID("in_progress_ballot"),
-                                       row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata));
-        // either most_recent_commit and most_recent_commit_at will both be set, or neither
-        Commit mostRecent = row.has("most_recent_commit")
-                          ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
-                          : Commit.emptyCommit(key, metadata);
-        return new PaxosState(inProgress, mostRecent);
-    }
-
-    public static void savePaxosPromise(Commit promise)
-    {
-        String req = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s WHERE row_key = 0x%s AND cf_id = %s";
-        processInternal(String.format(req,
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(promise.ballot),
-                                      paxosTtl(promise.update.metadata),
-                                      promise.ballot,
-                                      ByteBufferUtil.bytesToHex(promise.key),
-                                      promise.update.id()));
-    }
-
-    public static void savePaxosProposal(Commit commit)
-    {
-        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(commit.ballot),
-                                      paxosTtl(commit.update.metadata),
-                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
-                                      ByteBufferUtil.bytesToHex(commit.key),
-                                      commit.update.id()));
-    }
-
-    private static int paxosTtl(CFMetaData metadata)
-    {
-        // keep paxos state around for at least 3h
-        return Math.max(3 * 3600, metadata.getGcGraceSeconds());
-    }
-
-    public static void savePaxosCommit(Commit commit, boolean eraseInProgressProposal)
-    {
-        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
-        // identical except adds proposal = null
-        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
-        processInternal(String.format(eraseInProgressProposal ? eraseCql : preserveCql,
-                                      PAXOS_CF,
-                                      UUIDGen.microsTimestamp(commit.ballot),
-                                      paxosTtl(commit.update.metadata),
-                                      commit.ballot,
-                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
-                                      ByteBufferUtil.bytesToHex(commit.key),
-                                      commit.update.id()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
deleted file mode 100644
index 409076f..0000000
--- a/src/java/org/apache/cassandra/db/Table.java
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPagers;
-import org.apache.cassandra.tracing.Tracing;
-
-/**
- * It represents a Keyspace.
- */
-public class Table
-{
-    public static final String SYSTEM_KS = "system";
-    private static final int DEFAULT_PAGE_SIZE = 10000;
-
-    private static final Logger logger = LoggerFactory.getLogger(Table.class);
-
-    /**
-     * accesses to CFS.memtable should acquire this for thread safety.
-     * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
-     * <p/>
-     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
-     */
-    public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
-
-    // It is possible to call Table.open without a running daemon, so it makes sense to ensure
-    // proper directories here as well as in CassandraDaemon.
-    static
-    {
-        if (!StorageService.instance.isClientMode())
-            DatabaseDescriptor.createAllDirectories();
-    }
-
-    public final KSMetaData metadata;
-
-    /* ColumnFamilyStore per column family */
-    private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
-    private volatile AbstractReplicationStrategy replicationStrategy;
-    public static final Function<String,Table> tableTransformer = new Function<String, Table>()
-    {
-        public Table apply(String tableName)
-        {
-            return Table.open(tableName);
-        }
-    };
-
-    public static Table open(String table)
-    {
-        return open(table, Schema.instance, true);
-    }
-
-    public static Table openWithoutSSTables(String table)
-    {
-        return open(table, Schema.instance, false);
-    }
-
-    private static Table open(String table, Schema schema, boolean loadSSTables)
-    {
-        Table tableInstance = schema.getTableInstance(table);
-
-        if (tableInstance == null)
-        {
-            // instantiate the Table.  we could use putIfAbsent but it's important to making sure it is only done once
-            // per keyspace, so we synchronize and re-check before doing it.
-            synchronized (Table.class)
-            {
-                tableInstance = schema.getTableInstance(table);
-                if (tableInstance == null)
-                {
-                    // open and store the table
-                    tableInstance = new Table(table, loadSSTables);
-                    schema.storeTableInstance(tableInstance);
-
-                    // table has to be constructed and in the cache before cacheRow can be called
-                    for (ColumnFamilyStore cfs : tableInstance.getColumnFamilyStores())
-                        cfs.initRowCache();
-                }
-            }
-        }
-        return tableInstance;
-    }
-
-    public static Table clear(String table)
-    {
-        return clear(table, Schema.instance);
-    }
-
-    public static Table clear(String table, Schema schema)
-    {
-        synchronized (Table.class)
-        {
-            Table t = schema.removeTableInstance(table);
-            if (t != null)
-            {
-                for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
-                    t.unloadCf(cfs);
-            }
-            return t;
-        }
-    }
-
-    /**
-     * Removes every SSTable in the directory from the appropriate DataTracker's view.
-     * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
-     */
-    public static void removeUnreadableSSTables(File directory)
-    {
-        for (Table table : Table.all())
-        {
-            for (ColumnFamilyStore baseCfs : table.getColumnFamilyStores())
-            {
-                for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
-                    cfs.maybeRemoveUnreadableSSTables(directory);
-            }
-        }
-    }
-
-    public Collection<ColumnFamilyStore> getColumnFamilyStores()
-    {
-        return Collections.unmodifiableCollection(columnFamilyStores.values());
-    }
-
-    public ColumnFamilyStore getColumnFamilyStore(String cfName)
-    {
-        UUID id = Schema.instance.getId(getName(), cfName);
-        if (id == null)
-            throw new IllegalArgumentException(String.format("Unknown table/cf pair (%s.%s)", getName(), cfName));
-        return getColumnFamilyStore(id);
-    }
-
-    public ColumnFamilyStore getColumnFamilyStore(UUID id)
-    {
-        ColumnFamilyStore cfs = columnFamilyStores.get(id);
-        if (cfs == null)
-            throw new IllegalArgumentException("Unknown CF " + id);
-        return cfs;
-    }
-
-    /**
-     * Take a snapshot of the specific column family, or the entire set of column families
-     * if columnFamily is null with a given timestamp
-     *
-     * @param snapshotName     the tag associated with the name of the snapshot.  This value may not be null
-     * @param columnFamilyName the column family to snapshot or all on null
-     * @throws IOException if the column family doesn't exist
-     */
-    public void snapshot(String snapshotName, String columnFamilyName) throws IOException
-    {
-        assert snapshotName != null;
-        boolean tookSnapShot = false;
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-        {
-            if (columnFamilyName == null || cfStore.name.equals(columnFamilyName))
-            {
-                tookSnapShot = true;
-                cfStore.snapshot(snapshotName);
-            }
-        }
-
-        if ((columnFamilyName != null) && !tookSnapShot)
-            throw new IOException("Failed taking snapshot. Column family " + columnFamilyName + " does not exist.");
-    }
-
-    /**
-     * @param clientSuppliedName may be null.
-     * @return the name of the snapshot
-     */
-    public static String getTimestampedSnapshotName(String clientSuppliedName)
-    {
-        String snapshotName = Long.toString(System.currentTimeMillis());
-        if (clientSuppliedName != null && !clientSuppliedName.equals(""))
-        {
-            snapshotName = snapshotName + "-" + clientSuppliedName;
-        }
-        return snapshotName;
-    }
-
-    /**
-     * Check whether snapshots already exists for a given name.
-     *
-     * @param snapshotName the user supplied snapshot name
-     * @return true if the snapshot exists
-     */
-    public boolean snapshotExists(String snapshotName)
-    {
-        assert snapshotName != null;
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-        {
-            if (cfStore.snapshotExists(snapshotName))
-                return true;
-        }
-        return false;
-    }
-
-    /**
-     * Clear all the snapshots for a given table.
-     *
-     * @param snapshotName the user supplied snapshot name. It empty or null,
-     *                     all the snapshots will be cleaned
-     */
-    public void clearSnapshot(String snapshotName)
-    {
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-        {
-            cfStore.clearSnapshot(snapshotName);
-        }
-    }
-
-    /**
-     * @return A list of open SSTableReaders
-     */
-    public List<SSTableReader> getAllSSTables()
-    {
-        List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size());
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-            list.addAll(cfStore.getSSTables());
-        return list;
-    }
-
-    private Table(String table, boolean loadSSTables)
-    {
-        metadata = Schema.instance.getKSMetaData(table);
-        assert metadata != null : "Unknown keyspace " + table;
-        createReplicationStrategy(metadata);
-
-        for (CFMetaData cfm : new ArrayList<CFMetaData>(metadata.cfMetaData().values()))
-        {
-            logger.debug("Initializing {}.{}", getName(), cfm.cfName);
-            initCf(cfm.cfId, cfm.cfName, loadSSTables);
-        }
-    }
-
-    public void createReplicationStrategy(KSMetaData ksm)
-    {
-        if (replicationStrategy != null)
-            StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
-
-        replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
-                                                                                    ksm.strategyClass,
-                                                                                    StorageService.instance.getTokenMetadata(),
-                                                                                    DatabaseDescriptor.getEndpointSnitch(),
-                                                                                    ksm.strategyOptions);
-    }
-
-    // best invoked on the compaction mananger.
-    public void dropCf(UUID cfId)
-    {
-        assert columnFamilyStores.containsKey(cfId);
-        ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
-        if (cfs == null)
-            return;
-
-        unloadCf(cfs);
-    }
-
-    // disassociate a cfs from this table instance.
-    private void unloadCf(ColumnFamilyStore cfs)
-    {
-        cfs.forceBlockingFlush();
-        cfs.invalidate();
-    }
-
-    /**
-     * adds a cf to internal structures, ends up creating disk files).
-     */
-    public void initCf(UUID cfId, String cfName, boolean loadSSTables)
-    {
-        ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
-
-        if (cfs == null)
-        {
-            // CFS being created for the first time, either on server startup or new CF being added.
-            // We don't worry about races here; startup is safe, and adding multiple idential CFs
-            // simultaneously is a "don't do that" scenario.
-            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
-            // CFS mbean instantiation will error out before we hit this, but in case that changes...
-            if (oldCfs != null)
-                throw new IllegalStateException("added multiple mappings for cf id " + cfId);
-        }
-        else
-        {
-            // re-initializing an existing CF.  This will happen if you cleared the schema
-            // on this node and it's getting repopulated from the rest of the cluster.
-            assert cfs.name.equals(cfName);
-            cfs.metadata.reload();
-            cfs.reload();
-        }
-    }
-
-    public Row getRow(QueryFilter filter)
-    {
-        ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
-        ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
-        return new Row(filter.key, columnFamily);
-    }
-
-    public void apply(RowMutation mutation, boolean writeCommitLog)
-    {
-        apply(mutation, writeCommitLog, true);
-    }
-
-    /**
-     * This method appends a row to the global CommitLog, then updates memtables and indexes.
-     *
-     * @param mutation       the row to write.  Must not be modified after calling apply, since commitlog append
-     *                       may happen concurrently, depending on the CL Executor type.
-     * @param writeCommitLog false to disable commitlog append entirely
-     * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
-     */
-    public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
-    {
-        // write the mutation to the commitlog and memtables
-        Tracing.trace("Acquiring switchLock read lock");
-        switchLock.readLock().lock();
-        try
-        {
-            if (writeCommitLog)
-            {
-                Tracing.trace("Appending to commitlog");
-                CommitLog.instance.add(mutation);
-            }
-
-            DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
-            for (ColumnFamily cf : mutation.getColumnFamilies())
-            {
-                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
-                if (cfs == null)
-                {
-                    logger.error("Attempting to mutate non-existant column family " + cf.id());
-                    continue;
-                }
-
-                Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
-                cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
-            }
-        }
-        finally
-        {
-            switchLock.readLock().unlock();
-        }
-    }
-
-    public AbstractReplicationStrategy getReplicationStrategy()
-    {
-        return replicationStrategy;
-    }
-
-    /**
-     * @param key row to index
-     * @param cfs ColumnFamily to index row in
-     * @param idxNames columns to index, in comparator order
-     */
-    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
-
-        Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
-
-        switchLock.readLock().lock();
-        try
-        {
-            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
-            while (pager.hasNext())
-            {
-                ColumnFamily cf = pager.next();
-                ColumnFamily cf2 = cf.cloneMeShallow();
-                for (Column column : cf)
-                {
-                    if (cfs.indexManager.indexes(column.name(), indexes))
-                        cf2.addColumn(column);
-                }
-                cfs.indexManager.indexRow(key.key, cf2);
-            }
-        }
-        finally
-        {
-            switchLock.readLock().unlock();
-        }
-    }
-
-    public List<Future<?>> flush()
-    {
-        List<Future<?>> futures = new ArrayList<Future<?>>(columnFamilyStores.size());
-        for (UUID cfId : columnFamilyStores.keySet())
-            futures.add(columnFamilyStores.get(cfId).forceFlush());
-        return futures;
-    }
-
-    public static Iterable<Table> all()
-    {
-        return Iterables.transform(Schema.instance.getTables(), tableTransformer);
-    }
-
-    public static Iterable<Table> nonSystem()
-    {
-        return Iterables.transform(Schema.instance.getNonSystemTables(), tableTransformer);
-    }
-
-    public static Iterable<Table> system()
-    {
-        return Iterables.transform(Schema.systemKeyspaceNames, tableTransformer);
-    }
-
-    @Override
-    public String toString()
-    {
-        return getClass().getSimpleName() + "(name='" + getName() + "')";
-    }
-
-    public String getName()
-    {
-        return metadata.name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 6ebedc7..226262c 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -36,7 +36,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation>
         Tracing.trace("Applying truncation of {}.{}", t.keyspace, t.columnFamily);
         try
         {
-            ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
+            ColumnFamilyStore cfs = Keyspace.open(t.keyspace).getColumnFamilyStore(t.columnFamily);
             cfs.truncateBlocking();
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
index 5cfdd27..4faa651 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
@@ -44,7 +44,7 @@ public class SSTableSliceIterator implements OnDiskAtomIterator
 
     /**
      * An iterator for a slice within an SSTable
-     * @param sstable Table for the CFS we are reading from
+     * @param sstable Keyspace for the CFS we are reading from
      * @param file Optional parameter that input is read from.  If null is passed, this class creates an appropriate one automatically.
      * If this class creates, it will close the underlying file when #close() is called.
      * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index 2855979..d56bf7a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -132,7 +132,7 @@ public class CommitLogAllocator
         assert !activeSegments.contains(next);
         activeSegments.add(next);
         if (isCapExceeded())
-            flushOldestTables();
+            flushOldestKeyspaces();
 
         return next;
     }
@@ -288,7 +288,7 @@ public class CommitLogAllocator
     /**
      * Force a flush on all dirty CFs represented in the oldest commitlog segment
      */
-    private void flushOldestTables()
+    private void flushOldestKeyspaces()
     {
         CommitLogSegment oldestSegment = activeSegments.peek();
 
@@ -297,8 +297,8 @@ public class CommitLogAllocator
             for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
             {
                 String keypace = Schema.instance.getCF(dirtyCFId).left;
-                final ColumnFamilyStore cfs = Table.open(keypace).getColumnFamilyStore(dirtyCFId);
-                // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
+                final ColumnFamilyStore cfs = Keyspace.open(keypace).getColumnFamilyStore(dirtyCFId);
+                // flush shouldn't run on the commitlog executor, since it acquires Keyspace.switchLock,
                 // which may already be held by a thread waiting for the CL executor (via getContext),
                 // causing deadlock
                 Runnable runnable = new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a351f7f..a32da86 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -48,7 +48,7 @@ public class CommitLogReplayer
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
 
-    private final Set<Table> tablesRecovered;
+    private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
     private final Map<UUID, AtomicInteger> invalidMutations;
     private final AtomicInteger replayedCount;
@@ -59,7 +59,7 @@ public class CommitLogReplayer
 
     public CommitLogReplayer()
     {
-        this.tablesRecovered = new NonBlockingHashSet<Table>();
+        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayList<Future<?>>();
         this.buffer = new byte[4096];
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
@@ -70,7 +70,7 @@ public class CommitLogReplayer
         // compute per-CF and global replay positions
         cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
-        Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemTable.getTruncationRecords();
+        Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemKeyspace.getTruncationRecords();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
@@ -105,10 +105,10 @@ public class CommitLogReplayer
         FBUtilities.waitOnFutures(futures);
         logger.debug("Finished waiting on mutations from recovery");
 
-        // flush replayed tables
+        // flush replayed keyspaces
         futures.clear();
-        for (Table table : tablesRecovered)
-            futures.addAll(table.flush());
+        for (Keyspace keyspace : keyspacesRecovered)
+            futures.addAll(keyspace.flush());
         FBUtilities.waitOnFutures(futures);
         return replayedCount.get();
     }
@@ -161,7 +161,7 @@ public class CommitLogReplayer
                     }
 
                     // RowMutation must be at LEAST 10 bytes:
-                    // 3 each for a non-empty Table and Key (including the
+                    // 3 each for a non-empty Keyspace and Key (including the
                     // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                     // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                     if (serializedSize < 10)
@@ -221,7 +221,7 @@ public class CommitLogReplayer
                 }
 
                 if (logger.isDebugEnabled())
-                    logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
+                    logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
                             + "}"));
 
                 final long entryLocation = reader.getFilePointer();
@@ -230,12 +230,12 @@ public class CommitLogReplayer
                 {
                     public void runMayThrow() throws IOException
                     {
-                        if (Schema.instance.getKSMetaData(frm.getTable()) == null)
+                        if (Schema.instance.getKSMetaData(frm.getKeyspaceName()) == null)
                             return;
                         if (pointInTimeExceeded(frm))
                             return;
 
-                        final Table table = Table.open(frm.getTable());
+                        final Keyspace keyspace = Keyspace.open(frm.getKeyspaceName());
 
                         // Rebuild the row mutation, omitting column families that 
                         // a) have already been flushed,
@@ -254,7 +254,7 @@ public class CommitLogReplayer
                             if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
                             {
                                 if (newRm == null)
-                                    newRm = new RowMutation(frm.getTable(), frm.key());
+                                    newRm = new RowMutation(frm.getKeyspaceName(), frm.key());
                                 newRm.add(columnFamily);
                                 replayedCount.incrementAndGet();
                             }
@@ -262,8 +262,8 @@ public class CommitLogReplayer
                         if (newRm != null)
                         {
                             assert !newRm.isEmpty();
-                            Table.open(newRm.getTable()).apply(newRm, false);
-                            tablesRecovered.add(table);
+                            Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+                            keyspacesRecovered.add(keyspace);
                         }
                     }
                 };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 7c79368..4dbd91d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -140,7 +140,7 @@ public class CompactionController
 
     public String getKeyspace()
     {
-        return cfs.table.getName();
+        return cfs.keyspace.getName();
     }
 
     public String getColumnFamily()
@@ -192,7 +192,7 @@ public class CompactionController
         {
             String keyString = cfs.metadata.getKeyValidator().getString(rows.get(0).getKey().key);
             logger.info(String.format("Compacting large row %s/%s:%s (%d bytes) incrementally",
-                                      cfs.table.getName(), cfs.name, keyString, rowSize));
+                                      cfs.keyspace.getName(), cfs.name, keyString, rowSize));
             return new LazilyCompactedRow(this, rows);
         }
         return new PrecompactedRow(this, rows);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 06dd95d..3641945 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Bounds;
@@ -137,12 +137,12 @@ public class CompactionManager implements CompactionManagerMBean
         if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize())
         {
             logger.debug("Background compaction is still running for {}.{} ({} remaining). Skipping",
-                         cfs.table.getName(), cfs.name, count);
+                         cfs.keyspace.getName(), cfs.name, count);
             return Collections.emptyList();
         }
 
         logger.debug("Scheduling a background task check for {}.{} with {}",
-                     cfs.table.getName(),
+                     cfs.keyspace.getName(),
                      cfs.name,
                      cfs.getCompactionStrategy().getClass().getSimpleName());
         List<Future<?>> futures = new ArrayList<Future<?>>();
@@ -180,7 +180,7 @@ public class CompactionManager implements CompactionManagerMBean
         {
             try
             {
-                logger.debug("Checking {}.{}", cfs.table.getName(), cfs.name);
+                logger.debug("Checking {}.{}", cfs.keyspace.getName(), cfs.name);
                 if (!cfs.isValid())
                 {
                     logger.debug("Aborting compaction for dropped CF");
@@ -322,7 +322,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         for (Pair<String, String> key : descriptors.keySet())
         {
-            ColumnFamilyStore cfs = Table.open(key.left).getColumnFamilyStore(key.right);
+            ColumnFamilyStore cfs = Keyspace.open(key.left).getColumnFamilyStore(key.right);
             submitUserDefined(cfs, descriptors.get(key), getDefaultGcBefore(cfs));
         }
     }
@@ -409,9 +409,9 @@ public class CompactionManager implements CompactionManagerMBean
     /* Used in tests. */
     public void disableAutoCompaction()
     {
-        for (String ksname : Schema.instance.getNonSystemTables())
+        for (String ksname : Schema.instance.getNonSystemKeyspaces())
         {
-            for (ColumnFamilyStore cfs : Table.open(ksname).getColumnFamilyStores())
+            for (ColumnFamilyStore cfs : Keyspace.open(ksname).getColumnFamilyStores())
                 cfs.disableAutoCompaction();
         }
     }
@@ -464,8 +464,8 @@ public class CompactionManager implements CompactionManagerMBean
     private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
     {
         assert !cfs.isIndex();
-        Table table = cfs.table;
-        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(table.getName());
+        Keyspace keyspace = cfs.keyspace;
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
         if (ranges.isEmpty())
         {
             logger.info("Cleanup cannot run before a node has joined the ring");
@@ -549,14 +549,14 @@ public class CompactionManager implements CompactionManagerMBean
                             if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
                             {
                                 // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
-                                Table.switchLock.readLock().lock();
+                                Keyspace.switchLock.readLock().lock();
                                 try
                                 {
                                     cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
                                 }
                                 finally
                                 {
-                                    Table.switchLock.readLock().unlock();
+                                    Keyspace.switchLock.readLock().unlock();
                                 }
                             }
                         }
@@ -643,7 +643,7 @@ public class CompactionManager implements CompactionManagerMBean
         else
         {
             // flush first so everyone is validating data that is as similar as possible
-            StorageService.instance.forceTableFlush(cfs.table.getName(), cfs.name);
+            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
 
             // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
             // instead so they won't be cleaned up if they do get compacted during the validation
@@ -674,8 +674,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             SSTableReader.releaseReferences(sstables);
             iter.close();
-            if (cfs.table.snapshotExists(snapshotName))
-                cfs.table.clearSnapshot(snapshotName);
+            if (cfs.keyspace.snapshotExists(snapshotName))
+                cfs.keyspace.clearSnapshot(snapshotName);
 
             metrics.finishCompaction(ci);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f07a1e7..a7b6c64 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -104,7 +104,7 @@ public class CompactionTask extends AbstractCompactionTask
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.name);
 
-        UUID taskId = SystemTable.startCompaction(cfs, toCompact);
+        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
 
         CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
         Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
@@ -222,7 +222,7 @@ public class CompactionTask extends AbstractCompactionTask
             // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
             // (in replaceCompactedSSTables)
             if (taskId != null)
-                SystemTable.finishCompaction(taskId);
+                SystemKeyspace.finishCompaction(taskId);
 
             if (collector != null)
                 collector.finishCompaction(ci);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
index 1bb4619..8403e75 100644
--- a/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
@@ -72,7 +72,7 @@ public class LegacyLeveledManifest
     /**
      * We need to migrate if there is a legacy leveledmanifest json-file
      * <p/>
-     * If there is no jsonfile, we can just start as normally, sstable level will be at 0 for all tables.
+     * If there is no jsonfile, we can just start as normally, sstable level will be at 0 for all sstables.
      *
      * @param keyspace
      * @param columnFamily

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index f43c13e..c0404e3 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -479,7 +479,7 @@ public class LeveledManifest
             }
         }
 
-        // look for a non-suspect table to compact with, starting with where we left off last time,
+        // look for a non-suspect keyspace to compact with, starting with where we left off last time,
         // and wrapping back to the beginning of the generation if necessary
         for (int i = 0; i < generations[level].size(); i++)
         {
@@ -541,7 +541,7 @@ public class LeveledManifest
         }
 
         logger.debug("Estimating {} compactions to do for {}.{}",
-                     Arrays.toString(estimated), cfs.table.getName(), cfs.name);
+                     Arrays.toString(estimated), cfs.keyspace.getName(), cfs.name);
         return Ints.checkedCast(tasks);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 3678184..cee5f97 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -163,10 +163,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     public static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Iterable<SSTableReader> sstables)
     {
-        List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(Iterables.size(sstables));
-        for(SSTableReader table: sstables)
-            tableLengthPairs.add(Pair.create(table, table.onDiskLength()));
-        return tableLengthPairs;
+        List<Pair<SSTableReader, Long>> sstableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(Iterables.size(sstables));
+        for(SSTableReader sstable : sstables)
+            sstableLengthPairs.add(Pair.create(sstable, sstable.onDiskLength()));
+        return sstableLengthPairs;
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 4d0914a..73f818f 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -49,7 +49,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
         AbstractType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
-        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
+        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
                                                              new LocalPartitioner(columnDef.getValidator()),
                                                              indexedCfMetadata);
@@ -60,7 +60,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         {
             /*
              * # of index CF's key = cardinality of indexed column.
-             * if # of keys stored in index CF is more than average column counts (means tall table),
+             * if # of keys stored in index CF is more than average column counts (means tall keyspaceName),
              * then consider it as high cardinality.
              */
             double estimatedKeys = indexCfs.estimateKeys();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index 991581d..e77bd0f 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -52,7 +52,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      */
     public abstract void update(ByteBuffer rowKey, Column col);
 
-    public String getNameForSystemTable(ByteBuffer column)
+    public String getNameForSystemKeyspace(ByteBuffer column)
     {
         return getIndexName();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index 1dd2de7..f241447 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -53,7 +53,7 @@ public abstract class PerRowSecondaryIndex extends SecondaryIndex
     public abstract void delete(DecoratedKey key);
 
     @Override
-    public String getNameForSystemTable(ByteBuffer columnName)
+    public String getNameForSystemKeyspace(ByteBuffer columnName)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 40ff1cc..c712252 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.db.index.composites.CompositesIndex;
@@ -97,12 +97,12 @@ public abstract class SecondaryIndex
 
     /**
      * Return the unique name for this index and column
-     * to be stored in the SystemTable that tracks if each column is built
+     * to be stored in the SystemKeyspace that tracks if each column is built
      *
      * @param columnName the name of the column
      * @return the unique name
      */
-    abstract public String getNameForSystemTable(ByteBuffer columnName);
+    abstract public String getNameForSystemKeyspace(ByteBuffer columnName);
 
     /**
      * Checks if the index for specified column is fully built
@@ -112,19 +112,19 @@ public abstract class SecondaryIndex
      */
     public boolean isIndexBuilt(ByteBuffer columnName)
     {
-        return SystemTable.isIndexBuilt(baseCfs.table.getName(), getNameForSystemTable(columnName));
+        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
     }
 
     public void setIndexBuilt()
     {
         for (ColumnDefinition columnDef : columnDefs)
-            SystemTable.setIndexBuilt(baseCfs.table.getName(), getNameForSystemTable(columnDef.name));
+            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name));
     }
 
     public void setIndexRemoved()
     {
         for (ColumnDefinition columnDef : columnDefs)
-            SystemTable.setIndexRemoved(baseCfs.table.getName(), getNameForSystemTable(columnDef.name));
+            SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name));
     }
 
     /**
@@ -204,7 +204,7 @@ public abstract class SecondaryIndex
         boolean allAreBuilt = true;
         for (ColumnDefinition cdef : columnDefs)
         {
-            if (!SystemTable.isIndexBuilt(baseCfs.table.getName(), getNameForSystemTable(cdef.name)))
+            if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name)))
             {
                 allAreBuilt = false;
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index c418a9c..eb09e43 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -22,7 +22,7 @@ import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
@@ -59,7 +59,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
             if (isStopRequested())
                 throw new CompactionInterruptedException(getCompactionInfo());
             DecoratedKey key = iter.next();
-            Table.indexRow(key, cfs, idxNames);
+            Keyspace.indexRow(key, cfs, idxNames);
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index a40f4bd..4fab92b 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -225,7 +225,7 @@ public class SecondaryIndexManager
         }
 
         index.removeIndex(column);
-        SystemTable.setIndexRemoved(baseCfs.metadata.ksName, index.getNameForSystemTable(column));
+        SystemKeyspace.setIndexRemoved(baseCfs.metadata.ksName, index.getNameForSystemKeyspace(column));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/dht/AbstractByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/AbstractByteOrderedPartitioner.java
index d037518..d01889a 100644
--- a/src/java/org/apache/cassandra/dht/AbstractByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/AbstractByteOrderedPartitioner.java
@@ -191,7 +191,7 @@ public abstract class AbstractByteOrderedPartitioner extends AbstractPartitioner
             lastToken = node;
         }
 
-        for (String ks : Schema.instance.getTables())
+        for (String ks : Schema.instance.getKeyspaces())
         {
             for (CFMetaData cfmd : Schema.instance.getKSMetaData(ks).cfMetaData().values())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 55d82e1..57d8d94 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
@@ -66,10 +66,10 @@ public class BootStrapper
         RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap");
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
-        for (String table : Schema.instance.getNonSystemTables())
+        for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
         {
-            AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
-            streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
+            AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
+            streamer.addRanges(keyspaceName, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 57c7297..b37c924 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -184,7 +184,7 @@ public class OrderPreservingPartitioner extends AbstractPartitioner<StringToken>
             lastToken = node;
         }
 
-        for (String ks : Schema.instance.getTables())
+        for (String ks : Schema.instance.getKeyspaces())
         {
             for (CFMetaData cfmd : Schema.instance.getKSMetaData(ks).cfMetaData().values())
             {