You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/24 18:49:21 UTC

svn commit: r807305 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Mon Aug 24 16:49:21 2009
New Revision: 807305

URL: http://svn.apache.org/viewvc?rev=807305&view=rev
Log:
allow blocking write to create hints if not enough of the "correct" nodes are live, but enough are to fulfil the requested consistency level.  patch by Sandeep Tata; reviewed by jbellis and Michael Greene for CASSANDRA-383

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=807305&r1=807304&r2=807305&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Aug 24 16:49:21 2009
@@ -33,7 +33,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.TimedStatsDeque;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata;
 
 import org.apache.log4j.Logger;
@@ -162,35 +161,31 @@
         }
         try
         {
-            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-            if (endpoints.length < (DatabaseDescriptor.getReplicationFactor() / 2) + 1)
+            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+            int blockFor = determineBlockFor(consistency_level);
+            List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+            if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
             {
                 throw new UnavailableException();
             }
-            int blockFor;
-            if (consistency_level == ConsistencyLevel.ONE)
-            {
-                blockFor = 1;
-            }
-            else if (consistency_level == ConsistencyLevel.QUORUM)
-            {
-                blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;
-            }
-            else if (consistency_level == ConsistencyLevel.ALL)
-            {
-                blockFor = DatabaseDescriptor.getReplicationFactor();
-            }
-            else
-            {
-                throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
-            }
             QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
             if (logger.isDebugEnabled())
-                logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
+                logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") + "]");
 
-            MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
+            // Get all the targets and stick them in an array
+            MessagingService.getMessagingInstance().sendRR(message, primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
             if (!quorumResponseHandler.get())
                 throw new UnavailableException();
+            if (primaryNodes.size() < endpointMap.size()) // Do we need to bother with Hinted Handoff?
+            {
+                for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+                {
+                    if (e.getKey() != e.getValue()) // Hinted Handoff to target
+                    {
+                        MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
+                    }
+                }
+            }
         }
         catch (Exception e)
         {
@@ -203,6 +198,41 @@
         }
     }
 
+    private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> endpointMap)
+    {
+        List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(endpointMap.size());
+        for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+        {
+            if (e.getKey() == e.getValue())
+            {
+                liveEndPoints.add(e.getKey());
+            }
+        }
+        return liveEndPoints;
+    }
+
+    private static int determineBlockFor(int consistency_level)
+    {
+        int blockFor;
+        if (consistency_level == ConsistencyLevel.ONE)
+        {
+            blockFor = 1;
+        }
+        else if (consistency_level == ConsistencyLevel.QUORUM)
+        {
+            blockFor = (DatabaseDescriptor.getReplicationFactor() / 2) + 1;
+        }
+        else if (consistency_level == ConsistencyLevel.ALL)
+        {
+            blockFor = DatabaseDescriptor.getReplicationFactor();
+        }
+        else
+        {
+            throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
+        }
+        return blockFor;
+    }
+
     public static void insertBlocking(RowMutation rm) throws UnavailableException
     {
         insertBlocking(rm, ConsistencyLevel.QUORUM);