You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/22 18:26:39 UTC

git commit: cleanup and simplify; only need to create one MessageOut

Updated Branches:
  refs/heads/trunk 369936c2c -> 62f429337


cleanup and simplify; only need to create one MessageOut


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62f42933
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62f42933
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62f42933

Branch: refs/heads/trunk
Commit: 62f429337caf0aa83b68720a5904e8527b840c80
Parents: 369936c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed May 22 11:26:17 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed May 22 11:26:17 2013 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/net/MessageOut.java  |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |   40 ++++++--------
 2 files changed, 18 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62f42933/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 14ef377..f419d51 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -83,7 +83,7 @@ public class MessageOut<T>
         return new MessageOut<T>(verb, payload, serializer, builder.build());
     }
     
-    public MessageOut withHeaderRemoved(String key)
+    public MessageOut<T> withHeaderRemoved(String key)
     {
         ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
         for (Map.Entry<String, byte[]> entry : parameters.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62f42933/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0e49854..829fc0a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -711,8 +711,8 @@ public class StorageProxy implements StorageProxyMBean
                                              ConsistencyLevel consistency_level)
     throws OverloadedException
     {
-        // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-        Map<String, Multimap<MessageOut, InetAddress>> dcMessages = null;
+        // replicas grouped by datacenter
+        Map<String, Collection<InetAddress>> dcGroups = null;
 
         for (InetAddress destination : targets)
         {
@@ -736,20 +736,17 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     // belongs on a different server
-                    if (logger.isTraceEnabled())
-                        logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
-
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    Multimap<MessageOut, InetAddress> messages = (dcMessages != null) ? dcMessages.get(dc) : null;
+                    Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
                     if (messages == null)
                     {
-                        messages = HashMultimap.create();
-                        if (dcMessages == null)
-                            dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
-                        dcMessages.put(dc, messages);
+                        messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+                        if (dcGroups == null)
+                            dcGroups = new HashMap<String, Collection<InetAddress>>();
+                        dcGroups.put(dc, messages);
                     }
 
-                    messages.put(rm.createMessage(), destination);
+                    messages.add(destination);
                 }
             }
             else
@@ -762,21 +759,18 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
-        if (dcMessages != null)
+        if (dcGroups != null)
         {
-            // for each datacenter, send a message to one node to relay the write to other replicas
-            for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
+            MessageOut<RowMutation> message = rm.createMessage();
+            // for each datacenter, send the message to one node to relay the write to other replicas
+            for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
             {
                 boolean isLocalDC = entry.getKey().equals(localDataCenter);
-                for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
-                {
-                    MessageOut message = messages.getKey();
-                    Collection<InetAddress> targets1 = messages.getValue();
-                    // a single message object is used for unhinted writes, so clean out any forwards
-                    // from previous loop iterations
-                    message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-                    sendMessagesToOneDC(message, targets1, isLocalDC, responseHandler);
-                }
+                Collection<InetAddress> dcTargets = entry.getValue();
+                // a single message object is used for unhinted writes, so clean out any forwards
+                // from previous loop iterations
+                message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
+                sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
             }
         }
     }