You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/12 00:36:05 UTC

svn commit: r1057929 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/net/MessagingService.java

Author: jbellis
Date: Tue Jan 11 23:36:05 2011
New Revision: 1057929

URL: http://svn.apache.org/viewvc?rev=1057929&view=rev
Log:
fix race condition in MessagingService.targets
patch by jbellis; reviewed by Folke Behrens for CASSANDRA-1959

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1057929&r1=1057928&r2=1057929&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 11 23:36:05 2011
@@ -18,6 +18,7 @@
  * fix CFMetaData.apply to only compare objects of the same class 
    (CASSANDRA-1962)
  * allow specifying specific SSTables to compact from JMX (CASSANDRA-1963)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959)
 
 
 0.7.0-dev

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057929&r1=1057928&r2=1057929&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:36:05 2011
@@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,6 +59,7 @@ import org.apache.cassandra.utils.Expiri
 import org.apache.cassandra.utils.GuidGenerator;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class MessagingService implements MessagingServiceMBean, ILatencyPublisher
 {
@@ -70,7 +72,7 @@ public class MessagingService implements
 
     /* This records all the results mapped by message Id */
     private static ExpiringMap<String, IMessageCallback> callbacks;
-    private static Multimap<String, InetAddress> targets;
+    private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
 
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -127,7 +129,7 @@ public class MessagingService implements
         {
             public Object apply(String messageId)
             {
-                Collection<InetAddress> addresses = targets.removeAll(messageId);
+                Collection<InetAddress> addresses = targets.remove(messageId);
                 if (addresses == null)
                     return null;
 
@@ -140,7 +142,6 @@ public class MessagingService implements
                 return null;
             }
         };
-        targets = ArrayListMultimap.create();
         callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -255,12 +256,33 @@ public class MessagingService implements
         addCallback(cb, messageId);
         for (InetAddress endpoint : to)
         {
-            targets.put(messageId, endpoint);
+            putTarget(messageId, endpoint);
             sendOneWay(message, endpoint);
         }
         return messageId;
     }
 
+    private void putTarget(String messageId, InetAddress endpoint)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        if (addresses == null)
+        {
+            addresses = new NonBlockingHashSet<InetAddress>();
+            Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
+            if (oldAddresses != null)
+                addresses = oldAddresses;
+        }
+        addresses.add(endpoint);
+    }
+
+    private static void removeTarget(String messageId, InetAddress from)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        // null is expected if we removed the callback or we got a reply after its timeout expired
+        if (addresses != null)
+            addresses.remove(from);
+    }
+
     public void addCallback(IAsyncCallback cb, String messageId)
     {
         callbacks.put(messageId, cb);
@@ -280,7 +302,7 @@ public class MessagingService implements
     {        
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
-        targets.put(messageId, to);
+        putTarget(messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -307,7 +329,7 @@ public class MessagingService implements
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            targets.put(groupId, to.get(i));
+            putTarget(groupId, to.get(i));
             sendOneWay(messages[i], to.get(i));
         }
         return groupId;
@@ -361,7 +383,7 @@ public class MessagingService implements
     {
         IAsyncResult iar = new AsyncResult();
         callbacks.put(message.getMessageId(), iar);
-        targets.put(message.getMessageId(), to);
+        putTarget(message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -429,7 +451,7 @@ public class MessagingService implements
     
     public static IMessageCallback removeRegisteredCallback(String messageId)
     {
-        targets.removeAll(messageId); // TODO fix this when we clean up quorum reads to do proper RR
+        targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
         return callbacks.remove(messageId);
     }
 
@@ -440,7 +462,7 @@ public class MessagingService implements
 
     public static void responseReceivedFrom(String messageId, InetAddress from)
     {
-        targets.remove(messageId, from);
+        removeTarget(messageId, from);
     }
 
     public static void validateMagic(int magic) throws IOException