You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:12 UTC
[3/5] cassandra git commit: Isolate schema serializaton code
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3e8b0a2..503dd7f 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -26,27 +26,20 @@ import java.util.concurrent.TimeUnit;
import javax.management.openmbean.*;
import com.google.common.base.Function;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -54,6 +47,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
@@ -70,155 +64,21 @@ public final class SystemKeyspace
public static final String NAME = "system";
- public static final String SCHEMA_KEYSPACES_TABLE = "schema_keyspaces";
- public static final String SCHEMA_COLUMNFAMILIES_TABLE = "schema_columnfamilies";
- public static final String SCHEMA_COLUMNS_TABLE = "schema_columns";
- public static final String SCHEMA_TRIGGERS_TABLE = "schema_triggers";
- public static final String SCHEMA_USER_TYPES_TABLE = "schema_usertypes";
- public static final String SCHEMA_FUNCTIONS_TABLE = "schema_functions";
- public static final String SCHEMA_AGGREGATES_TABLE = "schema_aggregates";
-
- public static final String BUILT_INDEXES_TABLE = "IndexInfo";
- public static final String HINTS_TABLE = "hints";
- public static final String BATCHLOG_TABLE = "batchlog";
- public static final String PAXOS_TABLE = "paxos";
- public static final String LOCAL_TABLE = "local";
- public static final String PEERS_TABLE = "peers";
- public static final String PEER_EVENTS_TABLE = "peer_events";
- public static final String RANGE_XFERS_TABLE = "range_xfers";
- public static final String COMPACTION_LOG_TABLE = "compactions_in_progress";
- public static final String COMPACTION_HISTORY_TABLE = "compaction_history";
- public static final String SSTABLE_ACTIVITY_TABLE = "sstable_activity";
-
- public static final List<String> ALL_SCHEMA_TABLES =
- Arrays.asList(SCHEMA_KEYSPACES_TABLE,
- SCHEMA_COLUMNFAMILIES_TABLE,
- SCHEMA_COLUMNS_TABLE,
- SCHEMA_TRIGGERS_TABLE,
- SCHEMA_USER_TYPES_TABLE,
- SCHEMA_FUNCTIONS_TABLE,
- SCHEMA_AGGREGATES_TABLE);
-
- private static int WEEK = (int) TimeUnit.DAYS.toSeconds(7);
-
- public static final CFMetaData SchemaKeyspacesTable =
- compile(SCHEMA_KEYSPACES_TABLE, "keyspace definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "durable_writes boolean,"
- + "strategy_class text,"
- + "strategy_options text,"
- + "PRIMARY KEY ((keyspace_name))) "
- + "WITH COMPACT STORAGE")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaColumnFamiliesTable =
- compile(SCHEMA_COLUMNFAMILIES_TABLE, "table definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "bloom_filter_fp_chance double,"
- + "caching text,"
- + "cf_id uuid," // post-2.1 UUID cfid
- + "comment text,"
- + "compaction_strategy_class text,"
- + "compaction_strategy_options text,"
- + "comparator text,"
- + "compression_parameters text,"
- + "default_time_to_live int,"
- + "default_validator text,"
- + "dropped_columns map<text, bigint>,"
- + "gc_grace_seconds int,"
- + "is_dense boolean,"
- + "key_validator text,"
- + "local_read_repair_chance double,"
- + "max_compaction_threshold int,"
- + "max_index_interval int,"
- + "memtable_flush_period_in_ms int,"
- + "min_compaction_threshold int,"
- + "min_index_interval int,"
- + "read_repair_chance double,"
- + "speculative_retry text,"
- + "subcomparator text,"
- + "type text,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaColumnsTable =
- compile(SCHEMA_COLUMNS_TABLE, "column definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "column_name text,"
- + "component_index int,"
- + "index_name text,"
- + "index_options text,"
- + "index_type text,"
- + "type text,"
- + "validator text,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaTriggersTable =
- compile(SCHEMA_TRIGGERS_TABLE, "trigger definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "trigger_name text,"
- + "trigger_options map<text, text>,"
- + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaUserTypesTable =
- compile(SCHEMA_USER_TYPES_TABLE, "user defined type definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "type_name text,"
- + "field_names list<text>,"
- + "field_types list<text>,"
- + "PRIMARY KEY ((keyspace_name), type_name))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaFunctionsTable =
- compile(SCHEMA_FUNCTIONS_TABLE, "user defined function definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "function_name text,"
- + "signature blob,"
- + "argument_names list<text>,"
- + "argument_types list<text>,"
- + "body text,"
- + "deterministic boolean,"
- + "language text,"
- + "return_type text,"
- + "PRIMARY KEY ((keyspace_name), function_name, signature))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData SchemaAggregatesTable =
- compile(SCHEMA_AGGREGATES_TABLE, "user defined aggregate definitions",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "aggregate_name text,"
- + "signature blob,"
- + "argument_types list<text>,"
- + "return_type text,"
- + "state_func text,"
- + "state_type text,"
- + "final_func text,"
- + "initcond blob,"
- + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))")
- .gcGraceSeconds(WEEK);
-
- public static final CFMetaData BuiltIndexesTable =
- compile(BUILT_INDEXES_TABLE, "built column indexes",
- "CREATE TABLE \"%s\" ("
- + "table_name text,"
- + "index_name text,"
- + "PRIMARY KEY ((table_name), index_name)) "
- + "WITH COMPACT STORAGE");
-
- public static final CFMetaData HintsTable =
- compile(HINTS_TABLE, "hints awaiting delivery",
+ public static final String HINTS = "hints";
+ public static final String BATCHLOG = "batchlog";
+ public static final String PAXOS = "paxos";
+ public static final String BUILT_INDEXES = "IndexInfo";
+ public static final String LOCAL = "local";
+ public static final String PEERS = "peers";
+ public static final String PEER_EVENTS = "peer_events";
+ public static final String RANGE_XFERS = "range_xfers";
+ public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress";
+ public static final String COMPACTION_HISTORY = "compaction_history";
+ public static final String SSTABLE_ACTIVITY = "sstable_activity";
+
+ public static final CFMetaData Hints =
+ compile(HINTS,
+ "hints awaiting delivery",
"CREATE TABLE %s ("
+ "target_id uuid,"
+ "hint_id timeuuid,"
@@ -229,8 +89,9 @@ public final class SystemKeyspace
.compactionStrategyOptions(Collections.singletonMap("enabled", "false"))
.gcGraceSeconds(0);
- public static final CFMetaData BatchlogTable =
- compile(BATCHLOG_TABLE, "batches awaiting replay",
+ public static final CFMetaData Batchlog =
+ compile(BATCHLOG,
+ "batches awaiting replay",
"CREATE TABLE %s ("
+ "id uuid,"
+ "data blob,"
@@ -240,8 +101,9 @@ public final class SystemKeyspace
.compactionStrategyOptions(Collections.singletonMap("min_threshold", "2"))
.gcGraceSeconds(0);
- private static final CFMetaData PaxosTable =
- compile(PAXOS_TABLE, "in-progress paxos proposals",
+ private static final CFMetaData Paxos =
+ compile(PAXOS,
+ "in-progress paxos proposals",
"CREATE TABLE %s ("
+ "row_key blob,"
+ "cf_id UUID,"
@@ -253,8 +115,19 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((row_key), cf_id))")
.compactionStrategyClass(LeveledCompactionStrategy.class);
- private static final CFMetaData LocalTable =
- compile(LOCAL_TABLE, "information about the local node",
+ // TODO: make private
+ public static final CFMetaData BuiltIndexes =
+ compile(BUILT_INDEXES,
+ "built column indexes",
+ "CREATE TABLE \"%s\" ("
+ + "table_name text,"
+ + "index_name text,"
+ + "PRIMARY KEY ((table_name), index_name)) "
+ + "WITH COMPACT STORAGE");
+
+ private static final CFMetaData Local =
+ compile(LOCAL,
+ "information about the local node",
"CREATE TABLE %s ("
+ "key text,"
+ "bootstrapped text,"
@@ -273,8 +146,9 @@ public final class SystemKeyspace
+ "truncated_at map<uuid, blob>,"
+ "PRIMARY KEY ((key)))");
- private static final CFMetaData PeersTable =
- compile(PEERS_TABLE, "information about known peers in the cluster",
+ private static final CFMetaData Peers =
+ compile(PEERS,
+ "information about known peers in the cluster",
"CREATE TABLE %s ("
+ "peer inet,"
+ "data_center text,"
@@ -287,22 +161,25 @@ public final class SystemKeyspace
+ "tokens set<varchar>,"
+ "PRIMARY KEY ((peer)))");
- private static final CFMetaData PeerEventsTable =
- compile(PEER_EVENTS_TABLE, "events related to peers",
+ private static final CFMetaData PeerEvents =
+ compile(PEER_EVENTS,
+ "events related to peers",
"CREATE TABLE %s ("
+ "peer inet,"
+ "hints_dropped map<uuid, int>,"
+ "PRIMARY KEY ((peer)))");
- private static final CFMetaData RangeXfersTable =
- compile(RANGE_XFERS_TABLE, "ranges requested for transfer",
+ private static final CFMetaData RangeXfers =
+ compile(RANGE_XFERS,
+ "ranges requested for transfer",
"CREATE TABLE %s ("
+ "token_bytes blob,"
+ "requested_at timestamp,"
+ "PRIMARY KEY ((token_bytes)))");
- private static final CFMetaData CompactionLogTable =
- compile(COMPACTION_LOG_TABLE, "unfinished compactions",
+ private static final CFMetaData CompactionsInProgress =
+ compile(COMPACTIONS_IN_PROGRESS,
+ "unfinished compactions",
"CREATE TABLE %s ("
+ "id uuid,"
+ "columnfamily_name text,"
@@ -310,8 +187,9 @@ public final class SystemKeyspace
+ "keyspace_name text,"
+ "PRIMARY KEY ((id)))");
- private static final CFMetaData CompactionHistoryTable =
- compile(COMPACTION_HISTORY_TABLE, "week-long compaction history",
+ private static final CFMetaData CompactionHistory =
+ compile(COMPACTION_HISTORY,
+ "week-long compaction history",
"CREATE TABLE %s ("
+ "id uuid,"
+ "bytes_in bigint,"
@@ -321,10 +199,11 @@ public final class SystemKeyspace
+ "keyspace_name text,"
+ "rows_merged map<int, bigint>,"
+ "PRIMARY KEY ((id)))")
- .defaultTimeToLive(WEEK);
+ .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7));
- private static final CFMetaData SSTableActivityTable =
- compile(SSTABLE_ACTIVITY_TABLE, "historic sstable read rates",
+ private static final CFMetaData SSTableActivity =
+ compile(SSTABLE_ACTIVITY,
+ "historic sstable read rates",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "columnfamily_name text,"
@@ -333,37 +212,29 @@ public final class SystemKeyspace
+ "rate_15m double,"
+ "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))");
- private static CFMetaData compile(String table, String comment, String cql)
+ private static CFMetaData compile(String name, String description, String schema)
{
- return CFMetaData.compile(String.format(cql, table), NAME).comment(comment);
+ return CFMetaData.compile(String.format(schema, name), NAME).comment(description);
}
public static KSMetaData definition()
{
- List<CFMetaData> tables =
- Arrays.asList(SchemaKeyspacesTable,
- SchemaColumnFamiliesTable,
- SchemaColumnsTable,
- SchemaTriggersTable,
- SchemaUserTypesTable,
- SchemaFunctionsTable,
- SchemaAggregatesTable,
- BuiltIndexesTable,
- HintsTable,
- BatchlogTable,
- PaxosTable,
- LocalTable,
- PeersTable,
- PeerEventsTable,
- RangeXfersTable,
- CompactionLogTable,
- CompactionHistoryTable,
- SSTableActivityTable);
+ Iterable<CFMetaData> tables =
+ Iterables.concat(LegacySchemaTables.All,
+ Arrays.asList(BuiltIndexes,
+ Hints,
+ Batchlog,
+ Paxos,
+ Local,
+ Peers,
+ PeerEvents,
+ RangeXfers,
+ CompactionsInProgress,
+ CompactionHistory,
+ SSTableActivity));
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
}
- private static final String LOCAL_KEY = "local";
-
private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
public enum BootstrapState
@@ -381,24 +252,15 @@ public final class SystemKeyspace
public static void finishStartup()
{
setupVersion();
-
- // add entries to system schema columnfamilies for the hardcoded system definitions
- KSMetaData ksmd = Schema.instance.getKSMetaData(NAME);
-
- // delete old, possibly obsolete entries in schema tables
- for (String table : ALL_SCHEMA_TABLES)
- executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", table), ksmd.name);
-
- // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
- ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply();
+ LegacySchemaTables.saveSystemKeyspaceSchema();
}
private static void setupVersion()
{
String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- executeOnceInternal(String.format(req, LOCAL_TABLE),
- LOCAL_KEY,
+ executeOnceInternal(String.format(req, LOCAL),
+ LOCAL,
FBUtilities.getReleaseVersionString(),
QueryProcessor.CQL_VERSION.toString(),
cassandraConstants.VERSION,
@@ -429,8 +291,8 @@ public final class SystemKeyspace
}
});
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
- executeInternal(String.format(req, COMPACTION_LOG_TABLE), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
- forceBlockingFlush(COMPACTION_LOG_TABLE);
+ executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
+ forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
return compactionId;
}
@@ -443,8 +305,8 @@ public final class SystemKeyspace
{
assert taskId != null;
- executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTION_LOG_TABLE), taskId);
- forceBlockingFlush(COMPACTION_LOG_TABLE);
+ executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
+ forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
}
/**
@@ -454,7 +316,7 @@ public final class SystemKeyspace
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
{
String req = "SELECT * FROM system.%s";
- UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTION_LOG_TABLE));
+ UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
@@ -479,7 +341,7 @@ public final class SystemKeyspace
public static void discardCompactionsInProgress()
{
- ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTION_LOG_TABLE);
+ ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
compactionLog.truncateBlocking();
}
@@ -491,24 +353,24 @@ public final class SystemKeyspace
Map<Integer, Long> rowsMerged)
{
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
- if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_TABLE))
+ if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
return;
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
- executeInternal(String.format(req, COMPACTION_HISTORY_TABLE), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
+ executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
}
public static TabularData getCompactionHistory() throws OpenDataException
{
- UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY_TABLE));
+ UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
return CompactionHistoryTabularData.from(queryResultSet);
}
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), truncationAsMapEntry(cfs, truncatedAt, position));
+ executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
- forceBlockingFlush(LOCAL_TABLE);
+ forceBlockingFlush(LOCAL);
}
/**
@@ -517,9 +379,9 @@ public final class SystemKeyspace
public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), cfId);
+ executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
- forceBlockingFlush(LOCAL_TABLE);
+ forceBlockingFlush(LOCAL);
}
private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
@@ -558,7 +420,7 @@ public final class SystemKeyspace
private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
- UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
@@ -597,14 +459,14 @@ public final class SystemKeyspace
}
String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
- executeInternal(String.format(req, PEERS_TABLE), ep, tokensAsSet(tokens));
+ executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
}
public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
{
String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
- executeInternal(String.format(req, PEERS_TABLE), ep, preferred_ip);
- forceBlockingFlush(PEERS_TABLE);
+ executeInternal(String.format(req, PEERS), ep, preferred_ip);
+ forceBlockingFlush(PEERS);
}
public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
@@ -613,20 +475,20 @@ public final class SystemKeyspace
return;
String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
- executeInternal(String.format(req, PEERS_TABLE, columnName), ep, value);
+ executeInternal(String.format(req, PEERS, columnName), ep, value);
}
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
{
// with 30 day TTL
String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
- executeInternal(String.format(req, PEER_EVENTS_TABLE), timePeriod, value, ep);
+ executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, ep);
}
public static synchronized void updateSchemaVersion(UUID version)
{
String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), version);
+ executeInternal(String.format(req, LOCAL, LOCAL), version);
}
private static Set<String> tokensAsSet(Collection<Token> tokens)
@@ -653,7 +515,7 @@ public final class SystemKeyspace
public static synchronized void removeEndpoint(InetAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
- executeInternal(String.format(req, PEERS_TABLE), ep);
+ executeInternal(String.format(req, PEERS), ep);
}
/**
@@ -663,8 +525,8 @@ public final class SystemKeyspace
{
assert !tokens.isEmpty() : "removeEndpoint should be used instead";
String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), tokensAsSet(tokens));
- forceBlockingFlush(LOCAL_TABLE);
+ executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens));
+ forceBlockingFlush(LOCAL);
}
/**
@@ -696,7 +558,7 @@ public final class SystemKeyspace
public static SetMultimap<InetAddress, Token> loadTokens()
{
SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
- for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS_TABLE))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("tokens"))
@@ -713,7 +575,7 @@ public final class SystemKeyspace
public static Map<InetAddress, UUID> loadHostIds()
{
Map<InetAddress, UUID> hostIdMap = new HashMap<>();
- for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS_TABLE))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("host_id"))
@@ -733,7 +595,7 @@ public final class SystemKeyspace
public static InetAddress getPreferredIP(InetAddress ep)
{
String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
- UntypedResultSet result = executeInternal(String.format(req, PEERS_TABLE), ep);
+ UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
if (!result.isEmpty() && result.one().has("preferred_ip"))
return result.one().getInetAddress("preferred_ip");
return ep;
@@ -745,7 +607,7 @@ public final class SystemKeyspace
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
{
Map<InetAddress, Map<String, String>> result = new HashMap<>();
- for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS_TABLE))
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
{
InetAddress peer = row.getInetAddress("peer");
if (row.has("data_center") && row.has("rack"))
@@ -780,10 +642,10 @@ public final class SystemKeyspace
ex.initCause(err);
throw ex;
}
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_TABLE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL);
String req = "SELECT cluster_name FROM system.%s WHERE key='%s'";
- UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
if (result.isEmpty() || !result.one().has("cluster_name"))
{
@@ -793,7 +655,7 @@ public final class SystemKeyspace
// no system files. this is a new node.
req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), DatabaseDescriptor.getClusterName());
+ executeInternal(String.format(req, LOCAL, LOCAL), DatabaseDescriptor.getClusterName());
return;
}
@@ -805,7 +667,7 @@ public final class SystemKeyspace
public static Collection<Token> getSavedTokens()
{
String req = "SELECT tokens FROM system.%s WHERE key='%s'";
- UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
return result.isEmpty() || !result.one().has("tokens")
? Collections.<Token>emptyList()
: deserializeTokens(result.one().getSet("tokens", UTF8Type.instance));
@@ -814,7 +676,7 @@ public final class SystemKeyspace
public static int incrementAndGetGeneration()
{
String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'";
- UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
int generation;
if (result.isEmpty() || !result.one().has("gossip_generation"))
@@ -842,8 +704,8 @@ public final class SystemKeyspace
}
req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), generation);
- forceBlockingFlush(LOCAL_TABLE);
+ executeInternal(String.format(req, LOCAL, LOCAL), generation);
+ forceBlockingFlush(LOCAL);
return generation;
}
@@ -851,7 +713,7 @@ public final class SystemKeyspace
public static BootstrapState getBootstrapState()
{
String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'";
- UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
if (result.isEmpty() || !result.one().has("bootstrapped"))
return BootstrapState.NEEDS_BOOTSTRAP;
@@ -872,15 +734,15 @@ public final class SystemKeyspace
public static void setBootstrapState(BootstrapState state)
{
String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), state.name());
- forceBlockingFlush(LOCAL_TABLE);
+ executeInternal(String.format(req, LOCAL, LOCAL), state.name());
+ forceBlockingFlush(LOCAL);
}
public static boolean isIndexBuilt(String keyspaceName, String indexName)
{
- ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES_TABLE);
+ ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
- BUILT_INDEXES_TABLE,
+ BUILT_INDEXES,
FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
System.currentTimeMillis());
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
@@ -888,7 +750,7 @@ public final class SystemKeyspace
public static void setIndexBuilt(String keyspaceName, String indexName)
{
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES_TABLE);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES);
cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply();
}
@@ -896,7 +758,7 @@ public final class SystemKeyspace
public static void setIndexRemoved(String keyspaceName, String indexName)
{
Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName));
- mutation.delete(BUILT_INDEXES_TABLE, BuiltIndexesTable.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
+ mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
mutation.apply();
}
@@ -907,7 +769,7 @@ public final class SystemKeyspace
public static UUID getLocalHostId()
{
String req = "SELECT host_id FROM system.%s WHERE key='%s'";
- UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY));
+ UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
// Look up the Host UUID (return it if found)
if (!result.isEmpty() && result.one().has("host_id"))
@@ -925,144 +787,14 @@ public final class SystemKeyspace
public static UUID setLocalHostId(UUID hostId)
{
String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)";
- executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), hostId);
+ executeInternal(String.format(req, LOCAL, LOCAL), hostId);
return hostId;
}
- /**
- * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
- * @return CFS responsible to hold low-level serialized schema
- */
- public static ColumnFamilyStore schemaCFS(String cfName)
- {
- return Keyspace.open(NAME).getColumnFamilyStore(cfName);
- }
-
- public static List<Row> serializedSchema()
- {
- List<Row> schema = new ArrayList<>();
-
- for (String cf : ALL_SCHEMA_TABLES)
- schema.addAll(serializedSchema(cf));
-
- return schema;
- }
-
- /**
- * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
- * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
- */
- public static List<Row> serializedSchema(String schemaCfName)
- {
- Token minToken = StorageService.getPartitioner().getMinimumToken();
-
- return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
- null,
- new IdentityQueryFilter(),
- Integer.MAX_VALUE,
- System.currentTimeMillis());
- }
-
- public static Collection<Mutation> serializeSchema()
- {
- Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
-
- for (String cf : ALL_SCHEMA_TABLES)
- serializeSchema(mutationMap, cf);
-
- return mutationMap.values();
- }
-
- private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName)
- {
- for (Row schemaRow : serializedSchema(schemaCfName))
- {
- if (Schema.ignoredSchemaRow(schemaRow))
- continue;
-
- Mutation mutation = mutationMap.get(schemaRow.key);
- if (mutation == null)
- {
- mutation = new Mutation(NAME, schemaRow.key.getKey());
- mutationMap.put(schemaRow.key, mutation);
- }
-
- mutation.add(schemaRow.cf);
- }
- }
-
- public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName)
- {
- Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
-
- for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName))
- schema.put(schemaEntity.key, schemaEntity.cf);
-
- return schema;
- }
-
- public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces)
- {
- Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
-
- for (String keyspace : keyspaces)
- {
- Row schemaEntity = readSchemaRow(schemaCfName, keyspace);
- if (schemaEntity.cf != null)
- schema.put(schemaEntity.key, schemaEntity.cf);
- }
-
- return schema;
- }
-
- public static ByteBuffer getSchemaKSKey(String ksName)
- {
- return AsciiType.instance.fromString(ksName);
- }
-
- /**
- * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace.
- *
- * @param schemaCfName the schema table to get the data from (schema_keyspaces, schema_columnfamilies, schema_columns or schema_triggers)
- * @param ksName the keyspace of the tables we are interested in
- * @return a Row containing the schema data of a particular type for the keyspace
- */
- public static Row readSchemaRow(String schemaCfName, String ksName)
- {
- DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
-
- ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
- ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, schemaCfName, System.currentTimeMillis()));
-
- return new Row(key, result);
- }
-
- /**
- * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair.
- *
- * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers)
- * @param ksName the keyspace of the table we are interested in
- * @param cfName the table we are interested in
- * @return a Row containing the schema data of a particular type for the table
- */
- public static Row readSchemaRow(String schemaCfName, String ksName, String cfName)
- {
- DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
- ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
- Composite prefix = schemaCFS.getComparator().make(cfName);
- ColumnFamily cf = schemaCFS.getColumnFamily(key,
- prefix,
- prefix.end(),
- false,
- Integer.MAX_VALUE,
- System.currentTimeMillis());
- return new Row(key, cf);
- }
-
public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
{
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
- UntypedResultSet results = executeInternal(String.format(req, PAXOS_TABLE), key, metadata.cfId);
+ UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
@@ -1083,7 +815,7 @@ public final class SystemKeyspace
public static void savePaxosPromise(Commit promise)
{
String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
- executeInternal(String.format(req, PAXOS_TABLE),
+ executeInternal(String.format(req, PAXOS),
UUIDGen.microsTimestamp(promise.ballot),
paxosTtl(promise.update.metadata),
promise.ballot,
@@ -1093,7 +825,7 @@ public final class SystemKeyspace
public static void savePaxosProposal(Commit proposal)
{
- executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS_TABLE),
+ executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
UUIDGen.microsTimestamp(proposal.ballot),
paxosTtl(proposal.update.metadata),
proposal.ballot,
@@ -1113,7 +845,7 @@ public final class SystemKeyspace
// We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
// even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
- executeInternal(String.format(cql, PAXOS_TABLE),
+ executeInternal(String.format(cql, PAXOS),
UUIDGen.microsTimestamp(commit.ballot),
paxosTtl(commit.update.metadata),
commit.ballot,
@@ -1132,7 +864,7 @@ public final class SystemKeyspace
public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation)
{
String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?";
- UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation);
+ UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
if (results.isEmpty())
return new RestorableMeter();
@@ -1150,7 +882,7 @@ public final class SystemKeyspace
{
// Store values with a one-day TTL to handle corner cases where cleanup might not occur
String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000";
- executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE),
+ executeInternal(String.format(cql, SSTABLE_ACTIVITY),
keyspace,
table,
generation,
@@ -1164,6 +896,6 @@ public final class SystemKeyspace
public static void clearSSTableReadMeter(String keyspace, String table, int generation)
{
String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?";
- executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation);
+ executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 0b52904..8be9a18 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -24,17 +24,19 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.*;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -228,17 +230,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
{
partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
// get CF meta data
- String query = "SELECT comparator," +
- " subcomparator," +
- " type " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s' " +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, cfName)),
- Compression.NONE,
- ConsistencyLevel.ONE);
+ String query = String.format("SELECT comparator, subcomparator, type " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ cfName);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
Iterator<CqlRow> iteraRow = result.rows.iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 21e30e2..ffaaea9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -22,17 +22,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -40,10 +30,7 @@ import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-
import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +42,8 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -601,8 +590,15 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private void fetchKeys()
{
- String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
- keyspace + "' and columnfamily_name='" + cfName + "'";
+ String query = String.format("SELECT column_name, component_index, type " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNS,
+ keyspace,
+ cfName);
+
+ // get CF meta data
List<Row> rows = session.execute(query).all();
if (rows.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 311359a..0956ba5 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.LongType;
@@ -297,10 +300,6 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
{
result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
}
- catch (InvalidRequestException e)
- {
- throw new RuntimeException("failed to prepare cql query " + cql, e);
- }
catch (TException e)
{
throw new RuntimeException("failed to prepare cql query " + cql, e);
@@ -331,18 +330,20 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
return partitionKey;
}
+ // FIXME
/** retrieve the key validator from system.schema_columnfamilies table */
private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
String cfName = ConfigHelper.getOutputColumnFamily(conf);
- String query = "SELECT key_validator," +
- " key_aliases," +
- " column_aliases " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name='%s' and columnfamily_name='%s'";
- String formatted = String.format(query, keyspace, cfName);
- CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
+ String query = String.format("SELECT key_validator, key_aliases, column_aliases " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' and columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ cfName);
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
Column rawKeyValidator = result.rows.get(0).columns.get(0);
String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index dc37252..04d207f 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -26,6 +26,8 @@ import java.nio.charset.CharacterCodingException;
import java.util.*;
import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.auth.IAuthenticator;
@@ -585,20 +587,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
IOException
{
// get CF meta data
- String query = "SELECT type," +
- " comparator," +
- " subcomparator," +
- " default_validator," +
- " key_validator," +
- " key_aliases " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s' " +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
+ String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
if (result == null || result.rows == null || result.rows.isEmpty())
return null;
@@ -657,18 +654,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
ConfigurationException,
NotFoundException
{
- String query = "SELECT column_name, " +
- " validator, " +
- " index_type, " +
- " type " +
- "FROM system.schema_columns " +
- "WHERE keyspace_name = '%s' " +
- " AND columnfamily_name = '%s'";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
+ String query = String.format("SELECT column_name, validator, index_type, type " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNS,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
List<CqlRow> rows = result.rows;
List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 6cd5c66..fca1d43 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -25,6 +25,8 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.marshal.*;
@@ -482,21 +484,15 @@ public class CqlStorage extends AbstractCassandraStorage
protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
throws Exception
{
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
+ String query = String.format("SELECT key_aliases, column_aliases, key_validator, comparator, keyspace_name, value_alias, default_validator " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+ SystemKeyspace.NAME,
+ LegacySchemaTables.COLUMNFAMILIES,
+ keyspace,
+ column_family);
+
+ CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
if (result == null || result.rows == null || result.rows.isEmpty())
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 43cd2c0..ec590f3 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -349,16 +349,16 @@ public class CQLSSTableWriter implements Closeable
if (ksm == null)
{
ksm = KSMetaData.newKeyspace(this.schema.ksName,
- AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
- ImmutableMap.of("replication_factor", "1"),
- true,
- Collections.singleton(this.schema));
+ AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+ ImmutableMap.of("replication_factor", "1"),
+ true,
+ Collections.singleton(this.schema));
Schema.instance.load(ksm);
}
else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null)
{
Schema.instance.load(this.schema);
- ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema)));
+ ksm = ksm.cloneWithTableAdded(this.schema);
Schema.instance.setKeyspaceDefinition(ksm);
Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false);
}