You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/17 14:33:54 UTC

svn commit: r881280 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Tue Nov 17 13:33:53 2009
New Revision: 881280

URL: http://svn.apache.org/viewvc?rev=881280&view=rev
Log:
make local insert() skip MessagingService and fix HH write
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-558

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881280&r1=881279&r2=881280&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Nov 17 13:33:53 2009
@@ -19,6 +19,7 @@
 
 import java.io.IOError;
 import java.io.IOException;
+import java.io.IOError;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -71,65 +72,71 @@
     }
 
     /**
-     * This method is responsible for creating Message to be
-     * sent over the wire to N replicas where some of the replicas
-     * may be hints.
-     */
-    private static Map<InetAddress, Message> createWriteMessages(RowMutation rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
-    {
-        Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
-        Message message = rm.makeRowMutationMessage();
-
-        for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
-        {
-            InetAddress target = entry.getKey();
-            InetAddress hintedTarget = entry.getValue();
-            if (target.equals(hintedTarget))
-            {
-                messageMap.put(target, message);
-            }
-            else
-            {
-                Message hintedMessage = rm.makeRowMutationMessage();
-                hintedMessage.addHeader(RowMutation.HINT, hintedTarget.getAddress());
-                if (logger.isDebugEnabled())
-                    logger.debug("Sending the hint of " + hintedTarget + " to " + target);
-                messageMap.put(hintedTarget, hintedMessage);
-            }
-        }
-        return messageMap;
-    }
-    
-    /**
      * Use this method to have this RowMutation applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
-     * the data across to some other replica. 
+     * the data across to some other replica.
+     *
+     * This is the ZERO consistency level. We do not wait for replies.
+     *
      * @param rm the mutation to be applied across the replicas
     */
-    public static void insert(RowMutation rm)
+    public static void insert(final RowMutation rm)
     {
-        /*
-         * Get the N nodes from storage service where the data needs to be
-         * replicated
-         * Construct a message for write
-         * Send them asynchronously to the replicas.
-        */
-
         long startTime = System.currentTimeMillis();
         try
         {
             List<InetAddress> naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key());
-            // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
             Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
-            Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
-            for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
-            {
-                Message message = entry.getValue();
-                InetAddress endpoint = entry.getKey();
-                if (logger.isDebugEnabled())
-                    logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
-                MessagingService.instance().sendOneWay(message, endpoint);
+            Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes
+
+            // 3 cases:
+            // 1. local, unhinted write: run directly on write stage
+            // 2. non-local, unhinted write: send row mutation message
+            // 3. hinted write: add hint header, and send message
+            for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
+            {
+                InetAddress target = entry.getKey();
+                InetAddress hintedTarget = entry.getValue();
+                if (target.equals(hintedTarget))
+                {
+                    if (target.equals(FBUtilities.getLocalAddress()))
+                    {
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing local key " + rm.key());
+                        Runnable runnable = new Runnable()
+                        {
+                            public void run()
+                            {
+                                try
+                                {
+                                    rm.apply();
+                                }
+                                catch (IOException e)
+                                {
+                                    throw new IOError(e);
+                                }
+                            }
+                        };
+                        StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                    }
+                    else
+                    {
+                        if (unhintedMessage == null)
+                            unhintedMessage = rm.makeRowMutationMessage();
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
+                        MessagingService.instance().sendOneWay(unhintedMessage, target);
+                    }
+                }
+                else
+                {
+                    Message hintedMessage = rm.makeRowMutationMessage();
+                    hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
+                    if (logger.isDebugEnabled())
+                        logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
+                    MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+                }
             }
         }
         catch (IOException e)