You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/04 22:34:12 UTC

svn commit: r1055187 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Tue Jan  4 21:34:11 2011
New Revision: 1055187

URL: http://svn.apache.org/viewvc?rev=1055187&view=rev
Log:
fix batch mutations post-#1530
patch by tjake; reviewed by jbellis for CASSANDRA-1931

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055187&r1=1055186&r2=1055187&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan  4 21:34:11 2011
@@ -123,7 +123,7 @@ public class StorageProxy implements Sto
                 responseHandlers.add(responseHandler);
                 
                 // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-                Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10);
+                Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
                 Message unhintedMessage = null;
 
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
@@ -150,7 +150,16 @@ public class StorageProxy implements Sto
                             }
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
-                            dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination));
+                            
+                            
+                            Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                            if (messages == null)
+                            {
+                               messages = HashMultimap.create();
+                               dcMessages.put(dc, messages);
+                            }
+                            
+                            messages.put(unhintedMessage, destination);
                         }
                     }
                     else
@@ -167,7 +176,16 @@ public class StorageProxy implements Sto
                             }
                         }
                         responseHandler.addHintCallback(hintedMessage, destination);
-                        dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination));
+                        
+                        Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                        
+                        if (messages == null)
+                        {
+                           messages = HashMultimap.create();
+                           dcMessages.put(dc, messages);
+                        }
+                        
+                        messages.put(hintedMessage, destination);
                     }
                 }
 
@@ -194,53 +212,55 @@ public class StorageProxy implements Sto
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
-    private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages)
+    private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages)
     throws IOException
     {
-        for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet())
+        for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
         {
             String dataCenter = entry.getKey();
 
             // Grab a set of all the messages bound for this dataCenter and create an iterator over this set.
-            Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue();
-            Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator();
-            assert iter.hasNext();
-
-            // First endpoint in list is the destination for this group
-            Pair<Message, InetAddress> messageAndDestination = iter.next();
-
-            Message primaryMessage = messageAndDestination.left;
-            InetAddress target = messageAndDestination.right;
+            Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap();
 
-            // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
-            while (iter.hasNext())
+            for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet())
             {
-                messageAndDestination = iter.next();
-                assert messageAndDestination.left == primaryMessage;
+                Message message = messages.getKey();
+                Iterator<InetAddress> iter = messages.getValue().iterator();
+                assert iter.hasNext();
+                
+                // First endpoint in list is the destination for this group
+                InetAddress target = iter.next();
+            
 
-                if (dataCenter.equals(localDataCenter))
+                // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
+                while (iter.hasNext())
                 {
-                    // direct write to local DC
-                    assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
-                    MessagingService.instance().sendOneWay(primaryMessage, target);
-                }
-                else
-                {
-                    // group all nodes in this DC as forward headers on the primary message
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
-
-                    // append to older addresses
-                    byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
-                    if (previousHints != null)
-                        dos.write(previousHints);
+                    InetAddress destination = iter.next();
+
+                    if (dataCenter.equals(localDataCenter))
+                    {
+                        // direct write to local DC
+                        assert message.getHeader(RowMutation.FORWARD_HEADER) == null;
+                        MessagingService.instance().sendOneWay(message, target);
+                    }
+                    else
+                    {
+                        // group all nodes in this DC as forward headers on the primary message
+                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        DataOutputStream dos = new DataOutputStream(bos);
+
+                        // append to older addresses
+                        byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+                        if (previousHints != null)
+                            dos.write(previousHints);
 
-                    dos.write(messageAndDestination.right.getAddress());
-                    primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+                        dos.write(destination.getAddress());
+                        message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+                    }
                 }
+            
+                MessagingService.instance().sendOneWay(message, target);
             }
-
-            MessagingService.instance().sendOneWay(primaryMessage, target);
         }
     }