You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 06:49:04 UTC
svn commit: r888707 - in /incubator/cassandra/trunk: CHANGES.txt
src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Wed Dec 9 05:49:03 2009
New Revision: 888707
URL: http://svn.apache.org/viewvc?rev=888707&view=rev
Log:
support ConsistencyLevel.ALL on read. patch by jbellis; reviewed by goffinet for CASSANDRA-584
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec 9 05:49:03 2009
@@ -19,6 +19,7 @@
* increase failure conviction threshold, resulting in less nodes
incorrectly (and temporarily) marked as down (CASSANDRA-610)
* respect memtable thresholds during log replay (CASSANDRA-609)
+ * support ConsistencyLevel.ALL on read (CASSANDRA-584)
0.5.0 beta
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec 9 05:49:03 2009
@@ -407,6 +407,7 @@
List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
List<Row> rows = new ArrayList<Row>();
+ int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
int commandIndex = 0;
for (ReadCommand command: commands)
@@ -419,28 +420,24 @@
Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
InetAddress dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
- List<InetAddress> endpointList = StorageService.instance().getNaturalEndpoints(command.key);
+ List<InetAddress> endpointList = StorageService.instance().getLiveNaturalEndpoints(command.key);
+ if (endpointList.size() < responseCount)
+ throw new UnavailableException();
InetAddress[] endPoints = new InetAddress[endpointList.size()];
Message messages[] = new Message[endpointList.size()];
- /*
- * data-request message is sent to dataPoint, the node that will actually get
- * the data for us. The other replicas are only sent a digest query.
- */
+ // data-request message is sent to dataPoint, the node that will actually get
+ // the data for us. The other replicas are only sent a digest query.
int n = 0;
for (InetAddress endpoint : endpointList)
{
- if (!FailureDetector.instance().isAlive(endpoint))
- continue;
Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
endPoints[n] = endpoint;
messages[n++] = m;
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
- if (n < DatabaseDescriptor.getQuorum())
- throw new UnavailableException();
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum()));
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);