You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/19 05:41:34 UTC

[2/3] git commit: Fix cross-DC mutation forwarding patch by jbellis; reviewed by dbrosius for CASSANDRA-5632

Fix cross-DC mutation forwarding
patch by jbellis; reviewed by dbrosius for CASSANDRA-5632


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1d7405f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1d7405f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1d7405f

Branch: refs/heads/trunk
Commit: b1d7405fd1263a04d8cc4bbfcba3ec1928b75738
Parents: 26c4262
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 22:20:42 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 22:20:49 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/RowMutationVerbHandler.java    | 11 ++--
 src/java/org/apache/cassandra/db/Table.java     |  4 +-
 .../org/apache/cassandra/net/MessageOut.java    | 11 ----
 .../apache/cassandra/service/StorageProxy.java  | 57 ++++++++------------
 5 files changed, 28 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c48eb7d..5d36bd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Fix cross-DC mutation forwarding (CASSANDRA-5632)
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index c2126f5..eedd134 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -37,13 +37,13 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
         try
         {
             RowMutation rm = message.payload;
-            logger.debug("Applying mutation");
 
             // Check if there were any forwarding headers in this message
-            InetAddress replyTo = message.from;
             byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+            InetAddress replyTo;
             if (from == null)
             {
+                replyTo = message.from;
                 byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
                 if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
                     forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
@@ -73,15 +73,14 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
         DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
         int size = dis.readInt();
 
-        // remove fwds from message to avoid infinite loop
+        // tell the recipients who to send their ack to
         MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
+        // Send a message to each of the addresses on our Forward List
         for (int i = 0; i < size; i++)
         {
-            // Send a message to each of the addresses on our Forward List
             InetAddress address = CompactEndpointSerializationHelper.deserialize(dis);
             String id = dis.readUTF();
-            logger.debug("Forwarding message to {}@{}", id, address);
-            // Let the response go back to the coordinator
+            Tracing.trace("Enqueuing forwarded write to {}", address);
             MessagingService.instance().sendOneWay(message, id, address);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 17c510b..99a3446 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -363,10 +363,8 @@ public class Table
      */
     public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        if (!mutation.getTable().equals(Tracing.TRACE_KS))
-            Tracing.trace("Acquiring switchLock read lock");
-
         // write the mutation to the commitlog and memtables
+        Tracing.trace("Acquiring switchLock read lock");
         switchLock.readLock().lock();
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 14ef377..da9eda4 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -82,17 +82,6 @@ public class MessageOut<T>
         builder.putAll(parameters).put(key, value);
         return new MessageOut<T>(verb, payload, serializer, builder.build());
     }
-    
-    public MessageOut withHeaderRemoved(String key)
-    {
-        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
-        for (Map.Entry<String, byte[]> entry : parameters.entrySet())
-        {
-            if (!entry.getKey().equals(key))
-                builder.put(entry.getKey(), entry.getValue());
-        }
-        return new MessageOut<T>(verb, payload, serializer, builder.build());
-    }
 
     public Stage getStage()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c12cace..ee045eb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -204,14 +204,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             writeMetrics.timeouts.mark();
             ClientRequestMetrics.writeTimeouts.inc();
-            if (logger.isDebugEnabled())
-            {
-                List<String> mstrings = new ArrayList<String>(mutations.size());
-                for (IMutation mutation : mutations)
-                    mstrings.add(mutation.toString(true));
-                logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings);
-            }
-            Tracing.trace("Write timeout");
+            Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
             throw ex;
         }
         catch (UnavailableException e)
@@ -480,8 +473,8 @@ public class StorageProxy implements StorageProxyMBean
                                              ConsistencyLevel consistency_level)
     throws OverloadedException
     {
-        // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-        Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
+        // replicas grouped by datacenter
+        Map<String, Collection<InetAddress>> dcGroups = null;
 
         for (InetAddress destination : targets)
         {
@@ -506,13 +499,15 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     // belongs on a different server
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
-                    if (messages == null)
+                    Collection<InetAddress> dcTargets = (dcGroups != null) ? dcGroups.get(dc) : null;
+                    if (dcTargets == null)
                     {
-                        messages = HashMultimap.create();
-                        dcMessages.put(dc, messages);
+                        dcTargets = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+                        if (dcGroups == null)
+                            dcGroups = new HashMap<String, Collection<InetAddress>>();
+                        dcGroups.put(dc, dcTargets);
                     }
-                    messages.put(rm.createMessage(), destination);
+                    dcTargets.add(destination);
                 }
             }
             else
@@ -525,7 +520,17 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
-        sendMessages(localDataCenter, dcMessages, responseHandler);
+        if (dcGroups != null)
+        {
+            MessageOut<RowMutation> message = rm.createMessage();
+            // for each datacenter, send the message to one node to relay the write to other replicas
+            for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
+            {
+                boolean isLocalDC = entry.getKey().equals(localDataCenter);
+                Collection<InetAddress> dcTargets = entry.getValue();
+                sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
+            }
+        }
     }
 
     public static Future<Void> submitHint(final RowMutation mutation,
@@ -580,26 +585,6 @@ public class StorageProxy implements StorageProxyMBean
         totalHints.incrementAndGet();
     }
 
-    /**
-     * for each datacenter, send a message to one node to relay the write to other replicas
-     */
-    private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler)
-    {
-        for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
-        {
-            boolean isLocalDC = entry.getKey().equals(localDataCenter);
-            for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
-            {
-                MessageOut message = messages.getKey();
-                Collection<InetAddress> targets = messages.getValue();
-                // a single message object is used for unhinted writes, so clean out any forwards
-                // from previous loop iterations
-                message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-                sendMessagesToOneDC(message, targets, isLocalDC, handler);
-            }
-        }
-    }
-
     private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
     {
         try