You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/27 20:36:42 UTC

[06/11] Rename Table to Keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e96e585/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0286bd3..c57d01e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
@@ -189,7 +189,7 @@ public class StorageProxy implements StorageProxyMBean
      *  values) between the prepare and accept phases.  This gives us a slightly longer window for another
      *  coordinator to come along and trump our own promise with a newer one but is otherwise safe.
      *
-     * @param table the table for the CAS
+     * @param keyspaceName the keyspace for the CAS
      * @param cfName the column family for the CAS
      * @param key the row key for the row to CAS
      * @param prefix a column name prefix that selects the CQL3 row to check if {@code expected} is null. If {@code expected}
@@ -203,19 +203,19 @@ public class StorageProxy implements StorageProxyMBean
      * expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If
      * expected == null and the CAS is unsuccessfull, the first live column of the CF is returned.
      */
-    public static ColumnFamily cas(String table, String cfName, ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel)
+    public static ColumnFamily cas(String keyspaceName, String cfName, ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel)
     throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
-        consistencyLevel.validateForCas(table);
+        consistencyLevel.validateForCas(keyspaceName);
 
-        CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
 
         long start = System.nanoTime();
         long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
         while (System.nanoTime() - start < timeout)
         {
             // for simplicity, we'll do a single liveness check at the start of each attempt
-            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(table, key);
+            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key);
             List<InetAddress> liveEndpoints = p.left;
             int requiredParticipants = p.right;
 
@@ -232,11 +232,11 @@ public class StorageProxy implements StorageProxyMBean
                 SliceQueryFilter filter = prefix == null
                                         ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
                                         : new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(), false, 1, prefix.componentCount());
-                readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
+                readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
             }
             else
             {
-                readCommand = new SliceByNamesReadCommand(table, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+                readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
             }
             List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
@@ -303,11 +303,11 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
-    private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String table, ByteBuffer key) throws UnavailableException
+    private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key) throws UnavailableException
     {
         Token tk = StorageService.getPartitioner().getToken(key);
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
         int requiredParticipants = pendingEndpoints.size() + 1 + naturalEndpoints.size() / 2; // See CASSANDRA-833
         List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive));
         if (liveEndpoints.size() < requiredParticipants)
@@ -415,13 +415,13 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException
     {
-        Table table = Table.open(proposal.update.metadata().ksName);
+        Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
 
         Token tk = StorageService.getPartitioner().getToken(proposal.key);
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table.getName(), tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table.getName());
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName());
 
-        AbstractReplicationStrategy rs = table.getReplicationStrategy();
+        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -584,7 +584,7 @@ public class StorageProxy implements StorageProxyMBean
         AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
                                                                         Collections.<InetAddress>emptyList(),
                                                                         ConsistencyLevel.ONE,
-                                                                        Table.open(Table.SYSTEM_KS),
+                                                                        Keyspace.open(Keyspace.SYSTEM_KS),
                                                                         null,
                                                                         WriteType.BATCH_LOG);
         updateBatchlog(rm, endpoints, handler);
@@ -593,15 +593,15 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
     {
-        ColumnFamily cf = EmptyColumns.factory.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS, SystemTable.BATCHLOG_CF));
+        ColumnFamily cf = EmptyColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF));
         cf.delete(new DeletionInfo(FBUtilities.timestampMicros(), (int) (System.currentTimeMillis() / 1000)));
         AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
                                                                         Collections.<InetAddress>emptyList(),
                                                                         ConsistencyLevel.ANY,
-                                                                        Table.open(Table.SYSTEM_KS),
+                                                                        Keyspace.open(Keyspace.SYSTEM_KS),
                                                                         null,
                                                                         WriteType.SIMPLE);
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
+        RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
         updateBatchlog(rm, endpoints, handler);
     }
 
@@ -659,12 +659,12 @@ public class StorageProxy implements StorageProxyMBean
                                                             WriteType writeType)
     throws UnavailableException, OverloadedException
     {
-        String table = mutation.getTable();
-        AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+        String keyspaceName = mutation.getKeyspaceName();
+        AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
 
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
         AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
 
@@ -678,11 +678,11 @@ public class StorageProxy implements StorageProxyMBean
     // same as above except does not initiate writes (but does perfrom availability checks).
     private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
     {
-        AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
-        String table = mutation.getTable();
+        AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
+        String keyspaceName = mutation.getKeyspaceName();
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
         AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
         return new WriteResponseHandlerWrapper(responseHandler, mutation);
     }
@@ -939,7 +939,7 @@ public class StorageProxy implements StorageProxyMBean
      */
     public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
     {
-        InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency());
+        InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
 
         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
         {
@@ -948,11 +948,11 @@ public class StorageProxy implements StorageProxyMBean
         else
         {
             // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
-            String table = cm.getTable();
-            AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+            String keyspaceName = cm.getKeyspaceName();
+            AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = StorageService.getPartitioner().getToken(cm.key());
-            List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
-            Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
+            List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+            Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
             rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
 
@@ -975,14 +975,14 @@ public class StorageProxy implements StorageProxyMBean
      * is unclear we want to mix those latencies with read latencies, so this
      * may be a bit involved.
      */
-    private static InetAddress findSuitableEndpoint(String tableName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
+    private static InetAddress findSuitableEndpoint(String keyspaceName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
     {
-        Table table = Table.open(tableName);
+        Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
+        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, key);
         if (endpoints.isEmpty())
             // TODO have a way to compute the consistency level
-            throw new UnavailableException(cl, cl.blockFor(table), 0);
+            throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
 
         List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
         for (InetAddress endpoint : endpoints)
@@ -1054,10 +1054,10 @@ public class StorageProxy implements StorageProxyMBean
         };
     }
 
-    private static boolean systemTableQuery(List<ReadCommand> cmds)
+    private static boolean systemKeyspaceQuery(List<ReadCommand> cmds)
     {
         for (ReadCommand cmd : cmds)
-            if (!cmd.table.equals(Table.SYSTEM_KS))
+            if (!cmd.ksName.equals(Keyspace.SYSTEM_KS))
                 return false;
         return true;
     }
@@ -1069,7 +1069,7 @@ public class StorageProxy implements StorageProxyMBean
     public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
     throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException, WriteTimeoutException
     {
-        if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands))
+        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
         {
             readMetrics.unavailables.mark();
             ClientRequestMetrics.readUnavailables.inc();
@@ -1087,13 +1087,13 @@ public class StorageProxy implements StorageProxyMBean
                     throw new InvalidRequestException("SERIAL consistency may only be requested for one row at a time");
 
                 ReadCommand command = commands.get(0);
-                CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+                CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
 
                 long start = System.nanoTime();
                 long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
                 while (true)
                 {
-                    Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.table, command.key);
+                    Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key);
                     List<InetAddress> liveEndpoints = p.left;
                     int requiredParticipants = p.right;
 
@@ -1192,7 +1192,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadRepairMetrics.repairedBlocking.mark();
 
                     // Do a full data read to resolve the correct response (and repair node that need be)
-                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
+                    RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp);
                     ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
 
                     if (repairCommands == null)
@@ -1242,7 +1242,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
                     catch (TimeoutException e)
                     {
-                        int blockFor = consistency_level.blockFor(Table.open(command.getKeyspace()));
+                        int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
                         throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
                     }
 
@@ -1284,8 +1284,8 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            Table table = Table.open(command.table);
-            Row r = command.getRow(table);
+            Keyspace keyspace = Keyspace.open(command.ksName);
+            Row r = command.getRow(keyspace);
             ReadResponse result = ReadVerbHandler.getResponse(command, r);
             MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             handler.response(result);
@@ -1313,14 +1313,14 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
+    public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key)
     {
-        return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
+        return getLiveSortedEndpoints(keyspace, StorageService.instance.getPartitioner().decorateKey(key));
     }
 
-    private static List<InetAddress> getLiveSortedEndpoints(Table table, RingPosition pos)
+    private static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos)
     {
-        List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(table, pos);
+        List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
         DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
         return liveEndpoints;
     }
@@ -1343,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Determining replicas to query");
         long startTime = System.nanoTime();
 
-        Table table = Table.open(command.keyspace);
+        Keyspace keyspace = Keyspace.open(command.keyspace);
         List<Row> rows;
         // now scan until we have enough results
         try
@@ -1361,10 +1361,10 @@ public class StorageProxy implements StorageProxyMBean
                                                   ? ranges.get(i)
                                                   : nextRange;
                 List<InetAddress> liveEndpoints = nextEndpoints == null
-                                                ? getLiveSortedEndpoints(table, range.right)
+                                                ? getLiveSortedEndpoints(keyspace, range.right)
                                                 : nextEndpoints;
                 List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
-                                                    ? consistency_level.filterForQuery(table, liveEndpoints)
+                                                    ? consistency_level.filterForQuery(keyspace, liveEndpoints)
                                                     : nextFilteredEndpoints;
                 ++i;
 
@@ -1374,8 +1374,8 @@ public class StorageProxy implements StorageProxyMBean
                 while (i < ranges.size())
                 {
                     nextRange = ranges.get(i);
-                    nextEndpoints = getLiveSortedEndpoints(table, nextRange.right);
-                    nextFilteredEndpoints = consistency_level.filterForQuery(table, nextEndpoints);
+                    nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
+                    nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 
                     /*
                      * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
@@ -1390,10 +1390,10 @@ public class StorageProxy implements StorageProxyMBean
                     List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 
                     // Check if there is enough endpoint for the merge to be possible.
-                    if (!consistency_level.isSufficientLiveNodes(table, merged))
+                    if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
                         break;
 
-                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(table, merged);
+                    List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 
                     // Estimate whether merging will be a win or not
                     if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
@@ -1443,7 +1443,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     logger.debug("Range slice timeout: {}", ex.toString());
                     // We actually got all response at that point
-                    int blockFor = consistency_level.blockFor(table);
+                    int blockFor = consistency_level.blockFor(keyspace);
                     throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
                 }
                 catch (DigestMismatchException e)