You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:12 UTC

[3/5] cassandra git commit: Isolate schema serializaton code

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3e8b0a2..503dd7f 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -26,27 +26,20 @@ import java.util.concurrent.TimeUnit;
 import javax.management.openmbean.*;
 
 import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -54,6 +47,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
@@ -70,155 +64,21 @@ public final class SystemKeyspace
 
     public static final String NAME = "system";
 
-    public static final String SCHEMA_KEYSPACES_TABLE = "schema_keyspaces";
-    public static final String SCHEMA_COLUMNFAMILIES_TABLE = "schema_columnfamilies";
-    public static final String SCHEMA_COLUMNS_TABLE = "schema_columns";
-    public static final String SCHEMA_TRIGGERS_TABLE = "schema_triggers";
-    public static final String SCHEMA_USER_TYPES_TABLE = "schema_usertypes";
-    public static final String SCHEMA_FUNCTIONS_TABLE = "schema_functions";
-    public static final String SCHEMA_AGGREGATES_TABLE = "schema_aggregates";
-
-    public static final String BUILT_INDEXES_TABLE = "IndexInfo";
-    public static final String HINTS_TABLE = "hints";
-    public static final String BATCHLOG_TABLE = "batchlog";
-    public static final String PAXOS_TABLE = "paxos";
-    public static final String LOCAL_TABLE = "local";
-    public static final String PEERS_TABLE = "peers";
-    public static final String PEER_EVENTS_TABLE = "peer_events";
-    public static final String RANGE_XFERS_TABLE = "range_xfers";
-    public static final String COMPACTION_LOG_TABLE = "compactions_in_progress";
-    public static final String COMPACTION_HISTORY_TABLE = "compaction_history";
-    public static final String SSTABLE_ACTIVITY_TABLE = "sstable_activity";
-
-    public static final List<String> ALL_SCHEMA_TABLES =
-        Arrays.asList(SCHEMA_KEYSPACES_TABLE,
-                      SCHEMA_COLUMNFAMILIES_TABLE,
-                      SCHEMA_COLUMNS_TABLE,
-                      SCHEMA_TRIGGERS_TABLE,
-                      SCHEMA_USER_TYPES_TABLE,
-                      SCHEMA_FUNCTIONS_TABLE,
-                      SCHEMA_AGGREGATES_TABLE);
-
-    private static int WEEK = (int) TimeUnit.DAYS.toSeconds(7);
-
-    public static final CFMetaData SchemaKeyspacesTable =
-        compile(SCHEMA_KEYSPACES_TABLE, "keyspace definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "durable_writes boolean,"
-                + "strategy_class text,"
-                + "strategy_options text,"
-                + "PRIMARY KEY ((keyspace_name))) "
-                + "WITH COMPACT STORAGE")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaColumnFamiliesTable =
-        compile(SCHEMA_COLUMNFAMILIES_TABLE, "table definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "columnfamily_name text,"
-                + "bloom_filter_fp_chance double,"
-                + "caching text,"
-                + "cf_id uuid," // post-2.1 UUID cfid
-                + "comment text,"
-                + "compaction_strategy_class text,"
-                + "compaction_strategy_options text,"
-                + "comparator text,"
-                + "compression_parameters text,"
-                + "default_time_to_live int,"
-                + "default_validator text,"
-                + "dropped_columns map<text, bigint>,"
-                + "gc_grace_seconds int,"
-                + "is_dense boolean,"
-                + "key_validator text,"
-                + "local_read_repair_chance double,"
-                + "max_compaction_threshold int,"
-                + "max_index_interval int,"
-                + "memtable_flush_period_in_ms int,"
-                + "min_compaction_threshold int,"
-                + "min_index_interval int,"
-                + "read_repair_chance double,"
-                + "speculative_retry text,"
-                + "subcomparator text,"
-                + "type text,"
-                + "PRIMARY KEY ((keyspace_name), columnfamily_name))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaColumnsTable =
-        compile(SCHEMA_COLUMNS_TABLE, "column definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "columnfamily_name text,"
-                + "column_name text,"
-                + "component_index int,"
-                + "index_name text,"
-                + "index_options text,"
-                + "index_type text,"
-                + "type text,"
-                + "validator text,"
-                + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaTriggersTable =
-        compile(SCHEMA_TRIGGERS_TABLE, "trigger definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "columnfamily_name text,"
-                + "trigger_name text,"
-                + "trigger_options map<text, text>,"
-                + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaUserTypesTable =
-        compile(SCHEMA_USER_TYPES_TABLE, "user defined type definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "type_name text,"
-                + "field_names list<text>,"
-                + "field_types list<text>,"
-                + "PRIMARY KEY ((keyspace_name), type_name))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaFunctionsTable =
-        compile(SCHEMA_FUNCTIONS_TABLE, "user defined function definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "function_name text,"
-                + "signature blob,"
-                + "argument_names list<text>,"
-                + "argument_types list<text>,"
-                + "body text,"
-                + "deterministic boolean,"
-                + "language text,"
-                + "return_type text,"
-                + "PRIMARY KEY ((keyspace_name), function_name, signature))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData SchemaAggregatesTable =
-        compile(SCHEMA_AGGREGATES_TABLE, "user defined aggregate definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "aggregate_name text,"
-                + "signature blob,"
-                + "argument_types list<text>,"
-                + "return_type text,"
-                + "state_func text,"
-                + "state_type text,"
-                + "final_func text,"
-                + "initcond blob,"
-                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))")
-                .gcGraceSeconds(WEEK);
-
-    public static final CFMetaData BuiltIndexesTable =
-        compile(BUILT_INDEXES_TABLE, "built column indexes",
-                "CREATE TABLE \"%s\" ("
-                + "table_name text,"
-                + "index_name text,"
-                + "PRIMARY KEY ((table_name), index_name)) "
-                + "WITH COMPACT STORAGE");
-
-    public static final CFMetaData HintsTable =
-        compile(HINTS_TABLE, "hints awaiting delivery",
+    public static final String HINTS = "hints";
+    public static final String BATCHLOG = "batchlog";
+    public static final String PAXOS = "paxos";
+    public static final String BUILT_INDEXES = "IndexInfo";
+    public static final String LOCAL = "local";
+    public static final String PEERS = "peers";
+    public static final String PEER_EVENTS = "peer_events";
+    public static final String RANGE_XFERS = "range_xfers";
+    public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress";
+    public static final String COMPACTION_HISTORY = "compaction_history";
+    public static final String SSTABLE_ACTIVITY = "sstable_activity";
+
+    public static final CFMetaData Hints =
+        compile(HINTS,
+                "hints awaiting delivery",
                 "CREATE TABLE %s ("
                 + "target_id uuid,"
                 + "hint_id timeuuid,"
@@ -229,8 +89,9 @@ public final class SystemKeyspace
                 .compactionStrategyOptions(Collections.singletonMap("enabled", "false"))
                 .gcGraceSeconds(0);
 
-    public static final CFMetaData BatchlogTable =
-        compile(BATCHLOG_TABLE, "batches awaiting replay",
+    public static final CFMetaData Batchlog =
+        compile(BATCHLOG,
+                "batches awaiting replay",
                 "CREATE TABLE %s ("
                 + "id uuid,"
                 + "data blob,"
@@ -240,8 +101,9 @@ public final class SystemKeyspace
                 .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2"))
                 .gcGraceSeconds(0);
 
-    private static final CFMetaData PaxosTable =
-        compile(PAXOS_TABLE, "in-progress paxos proposals",
+    private static final CFMetaData Paxos =
+        compile(PAXOS,
+                "in-progress paxos proposals",
                 "CREATE TABLE %s ("
                 + "row_key blob,"
                 + "cf_id UUID,"
@@ -253,8 +115,19 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((row_key), cf_id))")
                 .compactionStrategyClass(LeveledCompactionStrategy.class);
 
-    private static final CFMetaData LocalTable =
-        compile(LOCAL_TABLE, "information about the local node",
+    // TODO: make private
+    public static final CFMetaData BuiltIndexes =
+        compile(BUILT_INDEXES,
+                "built column indexes",
+                "CREATE TABLE \"%s\" ("
+                + "table_name text,"
+                + "index_name text,"
+                + "PRIMARY KEY ((table_name), index_name)) "
+                + "WITH COMPACT STORAGE");
+
+    private static final CFMetaData Local =
+        compile(LOCAL,
+                "information about the local node",
                 "CREATE TABLE %s ("
                 + "key text,"
                 + "bootstrapped text,"
@@ -273,8 +146,9 @@ public final class SystemKeyspace
                 + "truncated_at map<uuid, blob>,"
                 + "PRIMARY KEY ((key)))");
 
-    private static final CFMetaData PeersTable =
-        compile(PEERS_TABLE, "information about known peers in the cluster",
+    private static final CFMetaData Peers =
+        compile(PEERS,
+                "information about known peers in the cluster",
                 "CREATE TABLE %s ("
                 + "peer inet,"
                 + "data_center text,"
@@ -287,22 +161,25 @@ public final class SystemKeyspace
                 + "tokens set<varchar>,"
                 + "PRIMARY KEY ((peer)))");
 
-    private static final CFMetaData PeerEventsTable =
-        compile(PEER_EVENTS_TABLE, "events related to peers",
+    private static final CFMetaData PeerEvents =
+        compile(PEER_EVENTS,
+                "events related to peers",
                 "CREATE TABLE %s ("
                 + "peer inet,"
                 + "hints_dropped map<uuid, int>,"
                 + "PRIMARY KEY ((peer)))");
 
-    private static final CFMetaData RangeXfersTable =
-        compile(RANGE_XFERS_TABLE, "ranges requested for transfer",
+    private static final CFMetaData RangeXfers =
+        compile(RANGE_XFERS,
+                "ranges requested for transfer",
                 "CREATE TABLE %s ("
                 + "token_bytes blob,"
                 + "requested_at timestamp,"
                 + "PRIMARY KEY ((token_bytes)))");
 
-    private static final CFMetaData CompactionLogTable =
-        compile(COMPACTION_LOG_TABLE, "unfinished compactions",
+    private static final CFMetaData CompactionsInProgress =
+        compile(COMPACTIONS_IN_PROGRESS,
+                "unfinished compactions",
                 "CREATE TABLE %s ("
                 + "id uuid,"
                 + "columnfamily_name text,"
@@ -310,8 +187,9 @@ public final class SystemKeyspace
                 + "keyspace_name text,"
                 + "PRIMARY KEY ((id)))");
 
-    private static final CFMetaData CompactionHistoryTable =
-        compile(COMPACTION_HISTORY_TABLE, "week-long compaction history",
+    private static final CFMetaData CompactionHistory =
+        compile(COMPACTION_HISTORY,
+                "week-long compaction history",
                 "CREATE TABLE %s ("
                 + "id uuid,"
                 + "bytes_in bigint,"
@@ -321,10 +199,11 @@ public final class SystemKeyspace
                 + "keyspace_name text,"
                 + "rows_merged map<int, bigint>,"
                 + "PRIMARY KEY ((id)))")
-                .defaultTimeToLive(WEEK);
+                .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7));
 
-    private static final CFMetaData SSTableActivityTable =
-        compile(SSTABLE_ACTIVITY_TABLE, "historic sstable read rates",
+    private static final CFMetaData SSTableActivity =
+        compile(SSTABLE_ACTIVITY,
+                "historic sstable read rates",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "columnfamily_name text,"
@@ -333,37 +212,29 @@ public final class SystemKeyspace
                 + "rate_15m double,"
                 + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))");
 
-    private static CFMetaData compile(String table, String comment, String cql)
+    private static CFMetaData compile(String name, String description, String schema)
     {
-        return CFMetaData.compile(String.format(cql, table), NAME).comment(comment);
+        return CFMetaData.compile(String.format(schema, name), NAME).comment(description);
     }
 
     public static KSMetaData definition()
     {
-        List<CFMetaData> tables =
-            Arrays.asList(SchemaKeyspacesTable,
-                          SchemaColumnFamiliesTable,
-                          SchemaColumnsTable,
-                          SchemaTriggersTable,
-                          SchemaUserTypesTable,
-                          SchemaFunctionsTable,
-                          SchemaAggregatesTable,
-                          BuiltIndexesTable,
-                          HintsTable,
-                          BatchlogTable,
-                          PaxosTable,
-                          LocalTable,
-                          PeersTable,
-                          PeerEventsTable,
-                          RangeXfersTable,
-                          CompactionLogTable,
-                          CompactionHistoryTable,
-                          SSTableActivityTable);
+        Iterable<CFMetaData> tables =
+            Iterables.concat(LegacySchemaTables.All,
+                             Arrays.asList(BuiltIndexes,
+                                           Hints,
+                                           Batchlog,
+                                           Paxos,
+                                           Local,
+                                           Peers,
+                                           PeerEvents,
+                                           RangeXfers,
+                                           CompactionsInProgress,
+                                           CompactionHistory,
+                                           SSTableActivity));
         return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
     }
 
-    private static final String LOCAL_KEY = "local";
-
     private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
 
     public enum BootstrapState
@@ -381,24 +252,15 @@ public final class SystemKeyspace
     public static void finishStartup()
     {
         setupVersion();
-
-        // add entries to system schema columnfamilies for the hardcoded system definitions
-        KSMetaData ksmd = Schema.instance.getKSMetaData(NAME);
-
-        // delete old, possibly obsolete entries in schema tables
-        for (String table : ALL_SCHEMA_TABLES)
-            executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", table), ksmd.name);
-
-        // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
-        ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
+        LegacySchemaTables.saveSystemKeyspaceSchema();
     }
 
     private static void setupVersion()
     {
         String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        executeOnceInternal(String.format(req, LOCAL_TABLE),
-                            LOCAL_KEY,
+        executeOnceInternal(String.format(req, LOCAL),
+                            LOCAL,
                             FBUtilities.getReleaseVersionString(),
                             QueryProcessor.CQL_VERSION.toString(),
                             cassandraConstants.VERSION,
@@ -429,8 +291,8 @@ public final class SystemKeyspace
             }
         });
         String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTION_LOG_TABLE), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
-        forceBlockingFlush(COMPACTION_LOG_TABLE);
+        executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
+        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
         return compactionId;
     }
 
@@ -443,8 +305,8 @@ public final class SystemKeyspace
     {
         assert taskId != null;
 
-        executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTION_LOG_TABLE), taskId);
-        forceBlockingFlush(COMPACTION_LOG_TABLE);
+        executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
+        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
     }
 
     /**
@@ -454,7 +316,7 @@ public final class SystemKeyspace
     public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
     {
         String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTION_LOG_TABLE));
+        UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
 
         Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
         for (UntypedResultSet.Row row : resultSet)
@@ -479,7 +341,7 @@ public final class SystemKeyspace
 
     public static void discardCompactionsInProgress()
     {
-        ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTION_LOG_TABLE);
+        ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
         compactionLog.truncateBlocking();
     }
 
@@ -491,24 +353,24 @@ public final class SystemKeyspace
                                                Map<Integer, Long> rowsMerged)
     {
         // don't write anything when the history table itself is compacted, since that would in turn cause new compactions
-        if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_TABLE))
+        if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
             return;
         String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTION_HISTORY_TABLE), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
+        executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
     }
 
     public static TabularData getCompactionHistory() throws OpenDataException
     {
-        UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY_TABLE));
+        UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
         return CompactionHistoryTabularData.from(queryResultSet);
     }
 
     public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), truncationAsMapEntry(cfs, truncatedAt, position));
+        executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
         truncationRecords = null;
-        forceBlockingFlush(LOCAL_TABLE);
+        forceBlockingFlush(LOCAL);
     }
 
     /**
@@ -517,9 +379,9 @@ public final class SystemKeyspace
     public static synchronized void removeTruncationRecord(UUID cfId)
     {
         String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), cfId);
+        executeInternal(String.format(req, LOCAL, LOCAL), cfId);
         truncationRecords = null;
-        forceBlockingFlush(LOCAL_TABLE);
+        forceBlockingFlush(LOCAL);
     }
 
     private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
@@ -558,7 +420,7 @@ public final class SystemKeyspace
 
     private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
     {
-        UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
 
         Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
 
@@ -597,14 +459,14 @@ public final class SystemKeyspace
         }
 
         String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS_TABLE), ep, tokensAsSet(tokens));
+        executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
     }
 
     public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
     {
         String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS_TABLE), ep, preferred_ip);
-        forceBlockingFlush(PEERS_TABLE);
+        executeInternal(String.format(req, PEERS), ep, preferred_ip);
+        forceBlockingFlush(PEERS);
     }
 
     public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
@@ -613,20 +475,20 @@ public final class SystemKeyspace
             return;
 
         String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS_TABLE, columnName), ep, value);
+        executeInternal(String.format(req, PEERS, columnName), ep, value);
     }
 
     public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
     {
         // with 30 day TTL
         String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
-        executeInternal(String.format(req, PEER_EVENTS_TABLE), timePeriod, value, ep);
+        executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, ep);
     }
 
     public static synchronized void updateSchemaVersion(UUID version)
     {
         String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), version);
+        executeInternal(String.format(req, LOCAL, LOCAL), version);
     }
 
     private static Set<String> tokensAsSet(Collection<Token> tokens)
@@ -653,7 +515,7 @@ public final class SystemKeyspace
     public static synchronized void removeEndpoint(InetAddress ep)
     {
         String req = "DELETE FROM system.%s WHERE peer = ?";
-        executeInternal(String.format(req, PEERS_TABLE), ep);
+        executeInternal(String.format(req, PEERS), ep);
     }
 
     /**
@@ -663,8 +525,8 @@ public final class SystemKeyspace
     {
         assert !tokens.isEmpty() : "removeEndpoint should be used instead";
         String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), tokensAsSet(tokens));
-        forceBlockingFlush(LOCAL_TABLE);
+        executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens));
+        forceBlockingFlush(LOCAL);
     }
 
     /**
@@ -696,7 +558,7 @@ public final class SystemKeyspace
     public static SetMultimap<InetAddress, Token> loadTokens()
     {
         SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS_TABLE))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("tokens"))
@@ -713,7 +575,7 @@ public final class SystemKeyspace
     public static Map<InetAddress, UUID> loadHostIds()
     {
         Map<InetAddress, UUID> hostIdMap = new HashMap<>();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS_TABLE))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("host_id"))
@@ -733,7 +595,7 @@ public final class SystemKeyspace
     public static InetAddress getPreferredIP(InetAddress ep)
     {
         String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
-        UntypedResultSet result = executeInternal(String.format(req, PEERS_TABLE), ep);
+        UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
         if (!result.isEmpty() && result.one().has("preferred_ip"))
             return result.one().getInetAddress("preferred_ip");
         return ep;
@@ -745,7 +607,7 @@ public final class SystemKeyspace
     public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
     {
         Map<InetAddress, Map<String, String>> result = new HashMap<>();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS_TABLE))
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
         {
             InetAddress peer = row.getInetAddress("peer");
             if (row.has("data_center") && row.has("rack"))
@@ -780,10 +642,10 @@ public final class SystemKeyspace
             ex.initCause(err);
             throw ex;
         }
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_TABLE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL);
 
         String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
 
         if (result.isEmpty() || !result.one().has("cluster_name"))
         {
@@ -793,7 +655,7 @@ public final class SystemKeyspace
 
             // no system files.  this is a new node.
             req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)";
-            executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), DatabaseDescriptor.getClusterName());
+            executeInternal(String.format(req, LOCAL, LOCAL), DatabaseDescriptor.getClusterName());
             return;
         }
 
@@ -805,7 +667,7 @@ public final class SystemKeyspace
     public static Collection<Token> getSavedTokens()
     {
         String req = "SELECT tokens FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
         return result.isEmpty() || !result.one().has("tokens")
              ? Collections.<Token>emptyList()
              : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
@@ -814,7 +676,7 @@ public final class SystemKeyspace
     public static int incrementAndGetGeneration()
     {
         String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
 
         int generation;
         if (result.isEmpty() || !result.one().has("gossip_generation"))
@@ -842,8 +704,8 @@ public final class SystemKeyspace
         }
 
         req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), generation);
-        forceBlockingFlush(LOCAL_TABLE);
+        executeInternal(String.format(req, LOCAL, LOCAL), generation);
+        forceBlockingFlush(LOCAL);
 
         return generation;
     }
@@ -851,7 +713,7 @@ public final class SystemKeyspace
     public static BootstrapState getBootstrapState()
     {
         String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
 
         if (result.isEmpty() || !result.one().has("bootstrapped"))
             return BootstrapState.NEEDS_BOOTSTRAP;
@@ -872,15 +734,15 @@ public final class SystemKeyspace
     public static void setBootstrapState(BootstrapState state)
     {
         String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), state.name());
-        forceBlockingFlush(LOCAL_TABLE);
+        executeInternal(String.format(req, LOCAL, LOCAL), state.name());
+        forceBlockingFlush(LOCAL);
     }
 
     public static boolean isIndexBuilt(String keyspaceName, String indexName)
     {
-        ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES_TABLE);
+        ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
         QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
-                                                        BUILT_INDEXES_TABLE,
+                                                        BUILT_INDEXES,
                                                         FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
                                                         System.currentTimeMillis());
         return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
@@ -888,7 +750,7 @@ public final class SystemKeyspace
 
     public static void setIndexBuilt(String keyspaceName, String indexName)
     {
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES_TABLE);
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES);
         cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
         new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply();
     }
@@ -896,7 +758,7 @@ public final class SystemKeyspace
     public static void setIndexRemoved(String keyspaceName, String indexName)
     {
         Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName));
-        mutation.delete(BUILT_INDEXES_TABLE, BuiltIndexesTable.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
+        mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
         mutation.apply();
     }
 
@@ -907,7 +769,7 @@ public final class SystemKeyspace
     public static UUID getLocalHostId()
     {
         String req = "SELECT host_id FROM system.%s WHERE key='%s'";
-        UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
 
         // Look up the Host UUID (return it if found)
         if (!result.isEmpty() && result.one().has("host_id"))
@@ -925,144 +787,14 @@ public final class SystemKeyspace
     public static UUID setLocalHostId(UUID hostId)
     {
         String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
-        executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), hostId);
+        executeInternal(String.format(req, LOCAL, LOCAL), hostId);
         return hostId;
     }
 
-    /**
-     * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
-     * @return CFS responsible to hold low-level serialized schema
-     */
-    public static ColumnFamilyStore schemaCFS(String cfName)
-    {
-        return Keyspace.open(NAME).getColumnFamilyStore(cfName);
-    }
-
-    public static List<Row> serializedSchema()
-    {
-        List<Row> schema = new ArrayList<>();
-
-        for (String cf : ALL_SCHEMA_TABLES)
-            schema.addAll(serializedSchema(cf));
-
-        return schema;
-    }
-
-    /**
-     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
-     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
-     */
-    public static List<Row> serializedSchema(String schemaCfName)
-    {
-        Token minToken = StorageService.getPartitioner().getMinimumToken();
-
-        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
-                                                     null,
-                                                     new IdentityQueryFilter(),
-                                                     Integer.MAX_VALUE,
-                                                     System.currentTimeMillis());
-    }
-
-    public static Collection<Mutation> serializeSchema()
-    {
-        Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
-
-        for (String cf : ALL_SCHEMA_TABLES)
-            serializeSchema(mutationMap, cf);
-
-        return mutationMap.values();
-    }
-
-    private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName)
-    {
-        for (Row schemaRow : serializedSchema(schemaCfName))
-        {
-            if (Schema.ignoredSchemaRow(schemaRow))
-                continue;
-
-            Mutation mutation = mutationMap.get(schemaRow.key);
-            if (mutation == null)
-            {
-                mutation = new Mutation(NAME, schemaRow.key.getKey());
-                mutationMap.put(schemaRow.key, mutation);
-            }
-
-            mutation.add(schemaRow.cf);
-        }
-    }
-
-    public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
-    {
-        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
-
-        for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
-            schema.put(schemaEntity.key, schemaEntity.cf);
-
-        return schema;
-    }
-
-    public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces)
-    {
-        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
-
-        for (String keyspace : keyspaces)
-        {
-            Row schemaEntity = readSchemaRow(schemaCfName, keyspace);
-            if (schemaEntity.cf != null)
-                schema.put(schemaEntity.key, schemaEntity.cf);
-        }
-
-        return schema;
-    }
-
-    public static ByteBuffer getSchemaKSKey(String ksName)
-    {
-        return AsciiType.instance.fromString(ksName);
-    }
-
-    /**
-     * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace.
-     *
-     * @param schemaCfName the schema table to get the data from (schema_keyspaces, schema_columnfamilies, schema_columns or schema_triggers)
-     * @param ksName the keyspace of the tables we are interested in
-     * @return a Row containing the schema data of a particular type for the keyspace
-     */
-    public static Row readSchemaRow(String schemaCfName, String ksName)
-    {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
-
-        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
-        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, schemaCfName, System.currentTimeMillis()));
-
-        return new Row(key, result);
-    }
-
-    /**
-     * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair.
-     *
-     * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers)
-     * @param ksName the keyspace of the table we are interested in
-     * @param cfName the table we are interested in
-     * @return a Row containing the schema data of a particular type for the table
-     */
-    public static Row readSchemaRow(String schemaCfName, String ksName, String cfName)
-    {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
-        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
-        Composite prefix = schemaCFS.getComparator().make(cfName);
-        ColumnFamily cf = schemaCFS.getColumnFamily(key,
-                                                    prefix,
-                                                    prefix.end(),
-                                                    false,
-                                                    Integer.MAX_VALUE,
-                                                    System.currentTimeMillis());
-        return new Row(key, cf);
-    }
-
     public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, PAXOS_TABLE), key, metadata.cfId);
+        UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -1083,7 +815,7 @@ public final class SystemKeyspace
     public static void savePaxosPromise(Commit promise)
     {
         String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
-        executeInternal(String.format(req, PAXOS_TABLE),
+        executeInternal(String.format(req, PAXOS),
                         UUIDGen.microsTimestamp(promise.ballot),
                         paxosTtl(promise.update.metadata),
                         promise.ballot,
@@ -1093,7 +825,7 @@ public final class SystemKeyspace
 
     public static void savePaxosProposal(Commit proposal)
     {
-        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS_TABLE),
+        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
                         UUIDGen.microsTimestamp(proposal.ballot),
                         paxosTtl(proposal.update.metadata),
                         proposal.ballot,
@@ -1113,7 +845,7 @@ public final class SystemKeyspace
         // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
         String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
-        executeInternal(String.format(cql, PAXOS_TABLE),
+        executeInternal(String.format(cql, PAXOS),
                         UUIDGen.microsTimestamp(commit.ballot),
                         paxosTtl(commit.update.metadata),
                         commit.ballot,
@@ -1132,7 +864,7 @@ public final class SystemKeyspace
     public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
     {
         String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
-        UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation);
+        UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
 
         if (results.isEmpty())
             return new RestorableMeter();
@@ -1150,7 +882,7 @@ public final class SystemKeyspace
     {
         // Store values with a one-day TTL to handle corner cases where cleanup might not occur
         String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
-        executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE),
+        executeInternal(String.format(cql, SSTABLE_ACTIVITY),
                         keyspace,
                         table,
                         generation,
@@ -1164,6 +896,6 @@ public final class SystemKeyspace
     public static void clearSSTableReadMeter(String keyspace, String table, int generation)
     {
         String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
-        executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation);
+        executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 0b52904..8be9a18 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,17 +24,19 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.*;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -228,17 +230,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             {
                 partitioner = FBUtilities.newPartitioner(client.describe_partitioner());           
                 // get CF meta data
-                String query = "SELECT comparator," +
-                               "       subcomparator," +
-                               "       type " +
-                               "FROM system.schema_columnfamilies " +
-                               "WHERE keyspace_name = '%s' " +
-                               "  AND columnfamily_name = '%s' ";
-
-                CqlResult result = client.execute_cql3_query(
-                                        ByteBufferUtil.bytes(String.format(query, keyspace, cfName)),
-                                        Compression.NONE,
-                                        ConsistencyLevel.ONE);
+                String query = String.format("SELECT comparator, subcomparator, type " +
+                                             "FROM %s.%s " +
+                                             "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                                             SystemKeyspace.NAME,
+                                             LegacySchemaTables.COLUMNFAMILIES,
+                                             keyspace,
+                                             cfName);
+
+                CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
 
                 Iterator<CqlRow> iteraRow = result.rows.iterator();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 21e30e2..ffaaea9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -22,17 +22,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -40,10 +30,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-
 import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +42,8 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TupleValue;
 import com.datastax.driver.core.UDTValue;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -601,8 +590,15 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     private void fetchKeys()
     {
-        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
-                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        String query = String.format("SELECT column_name, component_index, type " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNS,
+                                     keyspace,
+                                     cfName);
+
+        // get CF meta data
         List<Row> rows = session.execute(query).all();
         if (rows.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 311359a..0956ba5 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.LongType;
@@ -297,10 +300,6 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
                 {
                     result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
                 }
-                catch (InvalidRequestException e)
-                {
-                    throw new RuntimeException("failed to prepare cql query " + cql, e);
-                }
                 catch (TException e)
                 {
                     throw new RuntimeException("failed to prepare cql query " + cql, e);
@@ -331,18 +330,20 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
         return partitionKey;
     }
 
+    // FIXME
     /** retrieve the key validator from system.schema_columnfamilies table */
     private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
     {
         String keyspace = ConfigHelper.getOutputKeyspace(conf);
         String cfName = ConfigHelper.getOutputColumnFamily(conf);
-        String query = "SELECT key_validator," +
-        		       "       key_aliases," +
-        		       "       column_aliases " +
-                       "FROM system.schema_columnfamilies " +
-                       "WHERE keyspace_name='%s' and columnfamily_name='%s'";
-        String formatted = String.format(query, keyspace, cfName);
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+        String query = String.format("SELECT key_validator, key_aliases, column_aliases " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = '%s' and columnfamily_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNFAMILIES,
+                                     keyspace,
+                                     cfName);
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
 
         Column rawKeyValidator = result.rows.get(0).columns.get(0);
         String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index dc37252..04d207f 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -26,6 +26,8 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.auth.IAuthenticator;
@@ -585,20 +587,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                    IOException
     {
         // get CF meta data
-        String query = "SELECT type," +
-                       "       comparator," +
-                       "       subcomparator," +
-                       "       default_validator," +
-                       "       key_validator," +
-                       "       key_aliases " +
-                       "FROM system.schema_columnfamilies " +
-                       "WHERE keyspace_name = '%s' " +
-                       "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                                Compression.NONE,
-                                ConsistencyLevel.ONE);
+        String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNFAMILIES,
+                                     keyspace,
+                                     column_family);
+
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
 
         if (result == null || result.rows == null || result.rows.isEmpty())
             return null;
@@ -657,18 +654,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             ConfigurationException,
             NotFoundException
     {
-        String query = "SELECT column_name, " +
-                       "       validator, " +
-                       "       index_type, " +
-                       "       type " +
-                       "FROM system.schema_columns " +
-                       "WHERE keyspace_name = '%s' " +
-                       "  AND columnfamily_name = '%s'";
-
-        CqlResult result = client.execute_cql3_query(
-                                   ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                                   Compression.NONE,
-                                   ConsistencyLevel.ONE);
+        String query = String.format("SELECT column_name, validator, index_type, type " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNS,
+                                     keyspace,
+                                     column_family);
+
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
 
         List<CqlRow> rows = result.rows;
         List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 6cd5c66..fca1d43 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -25,6 +25,8 @@ import java.util.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.marshal.*;
@@ -482,21 +484,15 @@ public class CqlStorage extends AbstractCassandraStorage
     protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
             throws Exception
     {
-        String query = "SELECT key_aliases, " +
-                "       column_aliases, " +
-                "       key_validator, " +
-                "       comparator, " +
-                "       keyspace_name, " +
-                "       value_alias, " +
-                "       default_validator " +
-                "FROM system.schema_columnfamilies " +
-                "WHERE keyspace_name = '%s'" +
-                "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                Compression.NONE,
-                ConsistencyLevel.ONE);
+        String query = String.format("SELECT key_aliases, column_aliases, key_validator, comparator, keyspace_name, value_alias, default_validator " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNFAMILIES,
+                                     keyspace,
+                                     column_family);
+
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
 
         if (result == null || result.rows == null || result.rows.isEmpty())
             return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 43cd2c0..ec590f3 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -349,16 +349,16 @@ public class CQLSSTableWriter implements Closeable
                     if (ksm == null)
                     {
                         ksm = KSMetaData.newKeyspace(this.schema.ksName,
-                                AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
-                                ImmutableMap.of("replication_factor", "1"),
-                                true,
-                                Collections.singleton(this.schema));
+                                                     AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+                                                     ImmutableMap.of("replication_factor", "1"),
+                                                     true,
+                                                     Collections.singleton(this.schema));
                         Schema.instance.load(ksm);
                     }
                     else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null)
                     {
                         Schema.instance.load(this.schema);
-                        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema)));
+                        ksm = ksm.cloneWithTableAdded(this.schema);
                         Schema.instance.setKeyspaceDefinition(ksm);
                         Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false);
                     }