You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/12 00:36:05 UTC
svn commit: r1057929 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
src/java/org/apache/cassandra/net/MessagingService.java
Author: jbellis
Date: Tue Jan 11 23:36:05 2011
New Revision: 1057929
URL: http://svn.apache.org/viewvc?rev=1057929&view=rev
Log:
fix race condition in MessagingService.targets
patch by jbellis; reviewed by Folke Behrens for CASSANDRA-1959
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1057929&r1=1057928&r2=1057929&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 11 23:36:05 2011
@@ -18,6 +18,7 @@
* fix CFMetaData.apply to only compare objects of the same class
(CASSANDRA-1962)
* allow specifying specific SSTables to compact from JMX (CASSANDRA-1963)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959)
0.7.0-dev
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057929&r1=1057928&r2=1057929&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:36:05 2011
@@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousClo
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.*;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,6 +59,7 @@ import org.apache.cassandra.utils.Expiri
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
public class MessagingService implements MessagingServiceMBean, ILatencyPublisher
{
@@ -70,7 +72,7 @@ public class MessagingService implements
/* This records all the results mapped by message Id */
private static ExpiringMap<String, IMessageCallback> callbacks;
- private static Multimap<String, InetAddress> targets;
+ private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -127,7 +129,7 @@ public class MessagingService implements
{
public Object apply(String messageId)
{
- Collection<InetAddress> addresses = targets.removeAll(messageId);
+ Collection<InetAddress> addresses = targets.remove(messageId);
if (addresses == null)
return null;
@@ -140,7 +142,6 @@ public class MessagingService implements
return null;
}
};
- targets = ArrayListMultimap.create();
callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -255,12 +256,33 @@ public class MessagingService implements
addCallback(cb, messageId);
for (InetAddress endpoint : to)
{
- targets.put(messageId, endpoint);
+ putTarget(messageId, endpoint);
sendOneWay(message, endpoint);
}
return messageId;
}
+ private void putTarget(String messageId, InetAddress endpoint)
+ {
+ Collection<InetAddress> addresses = targets.get(messageId);
+ if (addresses == null)
+ {
+ addresses = new NonBlockingHashSet<InetAddress>();
+ Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
+ if (oldAddresses != null)
+ addresses = oldAddresses;
+ }
+ addresses.add(endpoint);
+ }
+
+ private static void removeTarget(String messageId, InetAddress from)
+ {
+ Collection<InetAddress> addresses = targets.get(messageId);
+ // null is expected if we removed the callback or we got a reply after its timeout expired
+ if (addresses != null)
+ addresses.remove(from);
+ }
+
public void addCallback(IAsyncCallback cb, String messageId)
{
callbacks.put(messageId, cb);
@@ -280,7 +302,7 @@ public class MessagingService implements
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
- targets.put(messageId, to);
+ putTarget(messageId, to);
sendOneWay(message, to);
return messageId;
}
@@ -307,7 +329,7 @@ public class MessagingService implements
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
- targets.put(groupId, to.get(i));
+ putTarget(groupId, to.get(i));
sendOneWay(messages[i], to.get(i));
}
return groupId;
@@ -361,7 +383,7 @@ public class MessagingService implements
{
IAsyncResult iar = new AsyncResult();
callbacks.put(message.getMessageId(), iar);
- targets.put(message.getMessageId(), to);
+ putTarget(message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
@@ -429,7 +451,7 @@ public class MessagingService implements
public static IMessageCallback removeRegisteredCallback(String messageId)
{
- targets.removeAll(messageId); // TODO fix this when we clean up quorum reads to do proper RR
+ targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
return callbacks.remove(messageId);
}
@@ -440,7 +462,7 @@ public class MessagingService implements
public static void responseReceivedFrom(String messageId, InetAddress from)
{
- targets.remove(messageId, from);
+ removeTarget(messageId, from);
}
public static void validateMagic(int magic) throws IOException