You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/26 23:23:17 UTC

svn commit: r979449 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Mon Jul 26 21:23:17 2010
New Revision: 979449

URL: http://svn.apache.org/viewvc?rev=979449&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-979402
+/cassandra/branches/cassandra-0.6:922689-979441
 /cassandra/trunk:978791
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=979449&r1=979448&r2=979449&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul 26 21:23:17 2010
@@ -65,6 +65,8 @@ dev
    (CASSANDRA-1280, CASSANDRA-1047)
  * log thread pool stats when GC is excessive (CASSANDRA-1275)
  * remove gossip message size limit (CASSANDRA-1138)
+ * parallelize local and remote reads during multiget, and respect snitch 
+   when determining whether to do local read for CL.ONE (CASSANDRA-1317)
 
 
 0.6.3

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-979441
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-979441
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-979441
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-979441
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-979441
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979449&r1=979448&r2=979449&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 26 21:23:17 2010
@@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -51,6 +52,7 @@ import org.apache.cassandra.net.IAsyncRe
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
@@ -284,88 +286,94 @@ public class StorageProxy implements Sto
     }
 
     /**
-     * Read the data from one replica.  When we get
-     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
-     * @param commands a set of commands to perform reads
-     * @return the row associated with command.key
-     * @throws Exception
+     * Performs the actual reading of a row out of the StorageService, fetching
+     * a specific set of column names from a given column family.
      */
-    private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
+    public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
+            throws IOException, UnavailableException, TimeoutException, InvalidRequestException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("weakreadremote reading " + StringUtils.join(commands, ", "));
-
-        List<Row> rows = new ArrayList<Row>();
-        List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
+        if (StorageService.instance.isBootstrapMode())
+            throw new InvalidRequestException("This node cannot accept reads until it has bootstrapped");
+        long startTime = System.nanoTime();
 
-        for (ReadCommand command: commands)
+        List<Row> rows;
+        if (consistency_level == ConsistencyLevel.ONE)
         {
-            InetAddress endpoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
-            Message message = command.makeReadMessage();
-
-            if (logger.isDebugEnabled())
-                logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endpoint);
-            if (randomlyReadRepair(command))
-                message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-            iars.add(MessagingService.instance.sendRR(message, endpoint));
+            rows = weakRead(commands);
         }
-
-        for (IAsyncResult iar: iars)
+        else
         {
-            byte[] body;
-            body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-            if (response.row() != null)
-                rows.add(response.row());
+            assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
+            rows = strongRead(commands, consistency_level);
         }
+
+        readStats.addNano(System.nanoTime() - startTime);
         return rows;
     }
 
-    /**
-     * Performs the actual reading of a row out of the StorageService, fetching
-     * a specific set of column names from a given column family.
-     */
-    public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
-            throws IOException, UnavailableException, TimeoutException
+    private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
     {
-        long startTime = System.nanoTime();
-
         List<Row> rows = new ArrayList<Row>();
 
-        if (consistency_level == ConsistencyLevel.ONE)
+        // send off all the commands asynchronously
+        List<Future<Object>> localFutures = null;
+        List<IAsyncResult> remoteResults = null;
+        for (ReadCommand command: commands)
         {
-            List<ReadCommand> localCommands = new ArrayList<ReadCommand>();
-            List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
+            InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
+            if (endPoint.equals(FBUtilities.getLocalAddress()))
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("weakread reading " + command + " locally");
 
-            for (ReadCommand command: commands)
+                if (localFutures == null)
+                    localFutures = new ArrayList<Future<Object>>();
+                Callable<Object> callable = new weakReadLocalCallable(command);
+                localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
+            }
+            else
             {
-                List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key);
-                boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress());
-                //TODO: Throw InvalidRequest if we're in bootstrap mode?
-                if (foundLocal && !StorageService.instance.isBootstrapMode())
+                if (remoteResults == null)
+                    remoteResults = new ArrayList<IAsyncResult>();
+                Message message = command.makeReadMessage();
+                if (logger.isDebugEnabled())
+                    logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
+                if (randomlyReadRepair(command))
+                    message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+                remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+            }
+        }
+
+        // wait for results
+        if (localFutures != null)
+        {
+            for (Future<Object> future : localFutures)
+            {
+                Row row;
+                try
                 {
-                    localCommands.add(command);
+                    row = (Row) future.get();
                 }
-                else
+                catch (Exception e)
                 {
-                    remoteCommands.add(command);
+                    throw new RuntimeException(e);
                 }
+                rows.add(row);
             }
-            if (localCommands.size() > 0)
-                rows.addAll(weakReadLocal(localCommands));
-
-            if (remoteCommands.size() > 0)
-                rows.addAll(weakReadRemote(remoteCommands));
         }
-        else
+        if (remoteResults != null)
         {
-            assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
-            rows = strongRead(commands, consistency_level);
+            for (IAsyncResult iar: remoteResults)
+            {
+                byte[] body;
+                body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                if (response.row() != null)
+                    rows.add(response.row());
+            }
         }
 
-        readStats.addNano(System.nanoTime() - startTime);
-
         return rows;
     }
 
@@ -467,31 +475,6 @@ public class StorageProxy implements Sto
     /*
     * This function executes the read protocol locally.  Consistency checks are performed in the background.
     */
-    private static List<Row> weakReadLocal(List<ReadCommand> commands)
-    {
-        List<Row> rows = new ArrayList<Row>();
-        List<Future<Object>> futures = new ArrayList<Future<Object>>();
-
-        for (ReadCommand command: commands)
-        {
-            Callable<Object> callable = new weakReadLocalCallable(command);
-            futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
-        }
-        for (Future<Object> future : futures)
-        {
-            Row row;
-            try
-            {
-                row = (Row) future.get();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-            rows.add(row);
-        }
-        return rows;
-    }
 
     public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
     throws IOException, UnavailableException, TimeoutException