You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/24 18:49:21 UTC
svn commit: r807305 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Mon Aug 24 16:49:21 2009
New Revision: 807305
URL: http://svn.apache.org/viewvc?rev=807305&view=rev
Log:
allow blocking write to create hints if not enough of the "correct" nodes are live, but enough are to fulfil the requested consistency level. patch by Sandeep Tata; reviewed by jbellis and Michael Greene for CASSANDRA-383
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=807305&r1=807304&r2=807305&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Aug 24 16:49:21 2009
@@ -33,7 +33,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.TimedStatsDeque;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.log4j.Logger;
@@ -162,35 +161,31 @@
}
try
{
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
- if (endpoints.length < (DatabaseDescriptor.getReplicationFactor() / 2) + 1)
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ int blockFor = determineBlockFor(consistency_level);
+ List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+ if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
{
throw new UnavailableException();
}
- int blockFor;
- if (consistency_level == ConsistencyLevel.ONE)
- {
- blockFor = 1;
- }
- else if (consistency_level == ConsistencyLevel.QUORUM)
- {
- blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;
- }
- else if (consistency_level == ConsistencyLevel.ALL)
- {
- blockFor = DatabaseDescriptor.getReplicationFactor();
- }
- else
- {
- throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
- }
QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
if (logger.isDebugEnabled())
- logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
+ logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") + "]");
- MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
+ // Get all the targets and stick them in an array
+ MessagingService.getMessagingInstance().sendRR(message, primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
if (!quorumResponseHandler.get())
throw new UnavailableException();
+ if (primaryNodes.size() < endpointMap.size()) // Do we need to bother with Hinted Handoff?
+ {
+ for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ {
+ if (e.getKey() != e.getValue()) // Hinted Handoff to target
+ {
+ MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
+ }
+ }
+ }
}
catch (Exception e)
{
@@ -203,6 +198,41 @@
}
}
+ private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> endpointMap)
+ {
+ List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(endpointMap.size());
+ for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ {
+ if (e.getKey() == e.getValue())
+ {
+ liveEndPoints.add(e.getKey());
+ }
+ }
+ return liveEndPoints;
+ }
+
+ private static int determineBlockFor(int consistency_level)
+ {
+ int blockFor;
+ if (consistency_level == ConsistencyLevel.ONE)
+ {
+ blockFor = 1;
+ }
+ else if (consistency_level == ConsistencyLevel.QUORUM)
+ {
+ blockFor = (DatabaseDescriptor.getReplicationFactor() / 2) + 1;
+ }
+ else if (consistency_level == ConsistencyLevel.ALL)
+ {
+ blockFor = DatabaseDescriptor.getReplicationFactor();
+ }
+ else
+ {
+ throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
+ }
+ return blockFor;
+ }
+
public static void insertBlocking(RowMutation rm) throws UnavailableException
{
insertBlocking(rm, ConsistencyLevel.QUORUM);