You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 06:49:04 UTC

svn commit: r888707 - in /incubator/cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Wed Dec  9 05:49:03 2009
New Revision: 888707

URL: http://svn.apache.org/viewvc?rev=888707&view=rev
Log:
support ConsistencyLevel.ALL on read.  patch by jbellis; reviewed by goffinet for CASSANDRA-584

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec  9 05:49:03 2009
@@ -19,6 +19,7 @@
  * increase failure conviction threshold, resulting in less nodes
    incorrectly (and temporarily) marked as down (CASSANDRA-610)
  * respect memtable thresholds during log replay (CASSANDRA-609)
+ * support ConsistencyLevel.ALL on read (CASSANDRA-584)
 
 
 0.5.0 beta

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec  9 05:49:03 2009
@@ -407,6 +407,7 @@
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
         List<Row> rows = new ArrayList<Row>();
 
+        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
         int commandIndex = 0;
 
         for (ReadCommand command: commands)
@@ -419,28 +420,24 @@
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
             InetAddress dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
-            List<InetAddress> endpointList = StorageService.instance().getNaturalEndpoints(command.key);
+            List<InetAddress> endpointList = StorageService.instance().getLiveNaturalEndpoints(command.key);
+            if (endpointList.size() < responseCount)
+                throw new UnavailableException();
 
             InetAddress[] endPoints = new InetAddress[endpointList.size()];
             Message messages[] = new Message[endpointList.size()];
-            /*
-             * data-request message is sent to dataPoint, the node that will actually get
-             * the data for us. The other replicas are only sent a digest query.
-            */
+            // data-request message is sent to dataPoint, the node that will actually get
+            // the data for us. The other replicas are only sent a digest query.
             int n = 0;
             for (InetAddress endpoint : endpointList)
             {
-                if (!FailureDetector.instance().isAlive(endpoint))
-                    continue;
                 Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
                 endPoints[n] = endpoint;
                 messages[n++] = m;
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            if (n < DatabaseDescriptor.getQuorum())
-                throw new UnavailableException();
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum()));
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
             MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);