You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/27 20:36:42 UTC
[06/11] Rename Table to Keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0286bd3..c57d01e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
@@ -189,7 +189,7 @@ public class StorageProxy implements StorageProxyMBean
* values) between the prepare and accept phases. This gives us a slightly longer window for another
* coordinator to come along and trump our own promise with a newer one but is otherwise safe.
*
- * @param table the table for the CAS
+ * @param keyspaceName the keyspace for the CAS
* @param cfName the column family for the CAS
* @param key the row key for the row to CAS
* @param prefix a column name prefix that selects the CQL3 row to check if {@code expected} is null. If {@code expected}
@@ -203,19 +203,19 @@ public class StorageProxy implements StorageProxyMBean
* expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If
* expected == null and the CAS is unsuccessfull, the first live column of the CF is returned.
*/
- public static ColumnFamily cas(String table, String cfName, ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel)
+ public static ColumnFamily cas(String keyspaceName, String cfName, ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel)
throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
{
- consistencyLevel.validateForCas(table);
+ consistencyLevel.validateForCas(keyspaceName);
- CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
long start = System.nanoTime();
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
while (System.nanoTime() - start < timeout)
{
// for simplicity, we'll do a single liveness check at the start of each attempt
- Pair<List<InetAddress>, Integer> p = getPaxosParticipants(table, key);
+ Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key);
List<InetAddress> liveEndpoints = p.left;
int requiredParticipants = p.right;
@@ -232,11 +232,11 @@ public class StorageProxy implements StorageProxyMBean
SliceQueryFilter filter = prefix == null
? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
: new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(), false, 1, prefix.componentCount());
- readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
+ readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
}
else
{
- readCommand = new SliceByNamesReadCommand(table, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+ readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
}
List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
ColumnFamily current = rows.get(0).cf;
@@ -303,11 +303,11 @@ public class StorageProxy implements StorageProxyMBean
return true;
}
- private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String table, ByteBuffer key) throws UnavailableException
+ private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key) throws UnavailableException
{
Token tk = StorageService.getPartitioner().getToken(key);
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
int requiredParticipants = pendingEndpoints.size() + 1 + naturalEndpoints.size() / 2; // See CASSANDRA-833
List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive));
if (liveEndpoints.size() < requiredParticipants)
@@ -415,13 +415,13 @@ public class StorageProxy implements StorageProxyMBean
private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException
{
- Table table = Table.open(proposal.update.metadata().ksName);
+ Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
Token tk = StorageService.getPartitioner().getToken(proposal.key);
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table.getName(), tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table.getName());
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName());
- AbstractReplicationStrategy rs = table.getReplicationStrategy();
+ AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -584,7 +584,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
Collections.<InetAddress>emptyList(),
ConsistencyLevel.ONE,
- Table.open(Table.SYSTEM_KS),
+ Keyspace.open(Keyspace.SYSTEM_KS),
null,
WriteType.BATCH_LOG);
updateBatchlog(rm, endpoints, handler);
@@ -593,15 +593,15 @@ public class StorageProxy implements StorageProxyMBean
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
{
- ColumnFamily cf = EmptyColumns.factory.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS, SystemTable.BATCHLOG_CF));
+ ColumnFamily cf = EmptyColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF));
cf.delete(new DeletionInfo(FBUtilities.timestampMicros(), (int) (System.currentTimeMillis() / 1000)));
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
Collections.<InetAddress>emptyList(),
ConsistencyLevel.ANY,
- Table.open(Table.SYSTEM_KS),
+ Keyspace.open(Keyspace.SYSTEM_KS),
null,
WriteType.SIMPLE);
- RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
+ RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
updateBatchlog(rm, endpoints, handler);
}
@@ -659,12 +659,12 @@ public class StorageProxy implements StorageProxyMBean
WriteType writeType)
throws UnavailableException, OverloadedException
{
- String table = mutation.getTable();
- AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+ String keyspaceName = mutation.getKeyspaceName();
+ AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
@@ -678,11 +678,11 @@ public class StorageProxy implements StorageProxyMBean
// same as above except does not initiate writes (but does perfrom availability checks).
private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
{
- AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
- String table = mutation.getTable();
+ AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
+ String keyspaceName = mutation.getKeyspaceName();
Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
return new WriteResponseHandlerWrapper(responseHandler, mutation);
}
@@ -939,7 +939,7 @@ public class StorageProxy implements StorageProxyMBean
*/
public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
{
- InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency());
+ InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
@@ -948,11 +948,11 @@ public class StorageProxy implements StorageProxyMBean
else
{
// Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
- String table = cm.getTable();
- AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+ String keyspaceName = cm.getKeyspaceName();
+ AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
Token tk = StorageService.getPartitioner().getToken(cm.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
@@ -975,14 +975,14 @@ public class StorageProxy implements StorageProxyMBean
* is unclear we want to mix those latencies with read latencies, so this
* may be a bit involved.
*/
- private static InetAddress findSuitableEndpoint(String tableName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
+ private static InetAddress findSuitableEndpoint(String keyspaceName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
{
- Table table = Table.open(tableName);
+ Keyspace keyspace = Keyspace.open(keyspaceName);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
+ List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, key);
if (endpoints.isEmpty())
// TODO have a way to compute the consistency level
- throw new UnavailableException(cl, cl.blockFor(table), 0);
+ throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
for (InetAddress endpoint : endpoints)
@@ -1054,10 +1054,10 @@ public class StorageProxy implements StorageProxyMBean
};
}
- private static boolean systemTableQuery(List<ReadCommand> cmds)
+ private static boolean systemKeyspaceQuery(List<ReadCommand> cmds)
{
for (ReadCommand cmd : cmds)
- if (!cmd.table.equals(Table.SYSTEM_KS))
+ if (!cmd.ksName.equals(Keyspace.SYSTEM_KS))
return false;
return true;
}
@@ -1069,7 +1069,7 @@ public class StorageProxy implements StorageProxyMBean
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException, WriteTimeoutException
{
- if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands))
+ if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
{
readMetrics.unavailables.mark();
ClientRequestMetrics.readUnavailables.inc();
@@ -1087,13 +1087,13 @@ public class StorageProxy implements StorageProxyMBean
throw new InvalidRequestException("SERIAL consistency may only be requested for one row at a time");
ReadCommand command = commands.get(0);
- CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+ CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
long start = System.nanoTime();
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
while (true)
{
- Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.table, command.key);
+ Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key);
List<InetAddress> liveEndpoints = p.left;
int requiredParticipants = p.right;
@@ -1192,7 +1192,7 @@ public class StorageProxy implements StorageProxyMBean
ReadRepairMetrics.repairedBlocking.mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
+ RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp);
ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
if (repairCommands == null)
@@ -1242,7 +1242,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (TimeoutException e)
{
- int blockFor = consistency_level.blockFor(Table.open(command.getKeyspace()));
+ int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
}
@@ -1284,8 +1284,8 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- Table table = Table.open(command.table);
- Row r = command.getRow(table);
+ Keyspace keyspace = Keyspace.open(command.ksName);
+ Row r = command.getRow(keyspace);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
handler.response(result);
@@ -1313,14 +1313,14 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
+ public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key)
{
- return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
+ return getLiveSortedEndpoints(keyspace, StorageService.instance.getPartitioner().decorateKey(key));
}
- private static List<InetAddress> getLiveSortedEndpoints(Table table, RingPosition pos)
+ private static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos)
{
- List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(table, pos);
+ List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
return liveEndpoints;
}
@@ -1343,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Determining replicas to query");
long startTime = System.nanoTime();
- Table table = Table.open(command.keyspace);
+ Keyspace keyspace = Keyspace.open(command.keyspace);
List<Row> rows;
// now scan until we have enough results
try
@@ -1361,10 +1361,10 @@ public class StorageProxy implements StorageProxyMBean
? ranges.get(i)
: nextRange;
List<InetAddress> liveEndpoints = nextEndpoints == null
- ? getLiveSortedEndpoints(table, range.right)
+ ? getLiveSortedEndpoints(keyspace, range.right)
: nextEndpoints;
List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
- ? consistency_level.filterForQuery(table, liveEndpoints)
+ ? consistency_level.filterForQuery(keyspace, liveEndpoints)
: nextFilteredEndpoints;
++i;
@@ -1374,8 +1374,8 @@ public class StorageProxy implements StorageProxyMBean
while (i < ranges.size())
{
nextRange = ranges.get(i);
- nextEndpoints = getLiveSortedEndpoints(table, nextRange.right);
- nextFilteredEndpoints = consistency_level.filterForQuery(table, nextEndpoints);
+ nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
+ nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
/*
* If the current range right is the min token, we should stop merging because CFS.getRangeSlice
@@ -1390,10 +1390,10 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
// Check if there is enough endpoint for the merge to be possible.
- if (!consistency_level.isSufficientLiveNodes(table, merged))
+ if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
break;
- List<InetAddress> filteredMerged = consistency_level.filterForQuery(table, merged);
+ List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
// Estimate whether merging will be a win or not
if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
@@ -1443,7 +1443,7 @@ public class StorageProxy implements StorageProxyMBean
{
logger.debug("Range slice timeout: {}", ex.toString());
// We actually got all response at that point
- int blockFor = consistency_level.blockFor(table);
+ int blockFor = consistency_level.blockFor(keyspace);
throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
}
catch (DigestMismatchException e)