You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/27 17:53:44 UTC

svn commit: r1064193 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Thu Jan 27 16:53:44 2011
New Revision: 1064193

URL: http://svn.apache.org/viewvc?rev=1064193&view=rev
Log:
reduce garbage generated by MessagingServiceto prevent loadspikes
patch by jbellis; reviewed by brandonwilliams and tjake for CASSANDRA-2058

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jan 27 16:53:44 2011
@@ -1,3 +1,8 @@
+0.6.11
+ * reduce garbage generated by MessagingService to prevent load spikes
+   (CASSANDRA-2058)
+
+
 0.6.10
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jan 27 16:53:44 2011
@@ -130,7 +130,7 @@ public class HintedHandOffManager
                 rm.add(cf);
                 Message message = rm.makeRowMutationMessage();
                 WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
-                MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
+                MessagingService.instance.sendRR(message, endPoint, responseHandler);
                 try
                 {
                     responseHandler.get();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java Thu Jan 27 16:53:44 2011
@@ -121,8 +121,6 @@ class AsyncResult implements IAsyncResul
         {
             lock_.unlock();
         }        
-
-        MessagingService.removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java Thu Jan 27 16:53:44 2011
@@ -95,11 +95,6 @@ public class Header
         return messageId_;
     }
 
-    void setMessageId(String id)
-    {
-        messageId_ = id;
-    }
-    
     byte[] getDetail(Object key)
     {
         return details_.get(key);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java Thu Jan 27 16:53:44 2011
@@ -93,11 +93,6 @@ public class Message
         return header_.getMessageId();
     }
 
-    void setMessageId(String id)
-    {
-        header_.setMessageId(id);
-    }    
-
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
     public Message getReply(InetAddress from, byte[] args)
     {

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Jan 27 16:53:44 2011
@@ -29,7 +29,6 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -45,12 +44,12 @@ import org.apache.cassandra.locator.ILat
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class MessagingService
 {
@@ -62,8 +61,7 @@ public class MessagingService
     public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private static ExpiringMap<String, IMessageCallback> callbacks;
-    private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
+    private static ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks;
 
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -103,21 +101,16 @@ public class MessagingService
         listenGate = new SimpleCondition();
         verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
 
-        Function<String, ?> timeoutReporter = new Function<String, Object>()
+        Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> timeoutReporter = new Function<Pair<String, Pair<InetAddress, IMessageCallback>>, Object>()
         {
-            public Object apply(String messageId)
+            public Object apply(Pair<String, Pair<InetAddress, IMessageCallback>> pair)
             {
-                Collection<InetAddress> addresses = targets.remove(messageId);
-                if (addresses == null)
-                    return null;
-
-                for (InetAddress address : addresses)
-                    addLatency(address, (double) DatabaseDescriptor.getRpcTimeout());
-
+                Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
+                maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout());
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+        callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
 
         defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
@@ -133,6 +126,18 @@ public class MessagingService
         timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
     }
 
+    /**
+     * Track latency information for the dynamic snitch
+     * @param cb: the callback associated with this message -- this lets us know if it's a message type we're interested in
+     * @param address: the host that replied to the message
+     * @param latency
+     */
+    public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
+    {
+        if (cb instanceof QuorumResponseHandler || cb instanceof AsyncResult)
+            addLatency(address, latency);
+    }
+
     public void addLatency(InetAddress address, double latency)
     {
         for (ILatencySubscriber subscriber : subscribers)
@@ -227,49 +232,9 @@ public class MessagingService
         return verbHandlers_.get(type);
     }
 
-    /**
-     * Send a message to a given endpoint.
-     * @param message message to be sent.
-     * @param to endpoint to which the message needs to be sent
-     * @return an reference to an IAsyncResult which can be queried for the
-     * response
-     */
-    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
-    {
-        String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        for (InetAddress endpoint : to)
-        {
-            putTarget(messageId, endpoint);
-            sendOneWay(message, endpoint);
-        }
-        return messageId;
-    }
-
-    private static void putTarget(String messageId, InetAddress endpoint)
-    {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        if (addresses == null)
-        {
-            addresses = new NonBlockingHashSet<InetAddress>();
-            Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
-            if (oldAddresses != null)
-                addresses = oldAddresses;
-        }
-        addresses.add(endpoint);
-    }
-
-    private static void removeTarget(String messageId, InetAddress from)
-    {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        // null is expected if we removed the callback or we got a reply after its timeout expired
-        if (addresses != null)
-            addresses.remove(from);
-    }
-
-    public void addCallback(IAsyncCallback cb, String messageId)
+    private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
     {
-        callbacks.put(messageId, cb);
+        callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
     }
 
     /**
@@ -285,43 +250,12 @@ public class MessagingService
     public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
     {        
         String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        putTarget(messageId, to);
+        addCallback(cb, messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
 
     /**
-     * Send a message to a given endpoint. The ith element in the <code>messages</code>
-     * array is sent to the ith element in the <code>to</code> array.This method assumes
-     * there is a one-one mapping between the <code>messages</code> array and
-     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
-     * This method also informs the MessagingService to wait for at least
-     * <code>howManyResults</code> responses to determine success of failure.
-     * @param messages messages to be sent.
-     * @param to endpoints to which the message needs to be sent
-     * @param cb callback interface which is used to pass the responses or
-     *           suggest that a timeout occured to the invoker of the send().
-     * @return an reference to message id used to match with the result
-     */
-    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
-    {
-        if ( messages.length != to.length )
-        {
-            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
-        }
-        String groupId = GuidGenerator.guid();
-        addCallback(cb, groupId);
-        for ( int i = 0; i < messages.length; ++i )
-        {
-            messages[i].setMessageId(groupId);
-            putTarget(groupId, to[i]);
-            sendOneWay(messages[i], to[i]);
-        }
-        return groupId;
-    } 
-    
-    /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
      * style messaging.
      * @param message messages to be sent.
@@ -368,8 +302,7 @@ public class MessagingService
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        callbacks.put(message.getMessageId(), iar);
-        putTarget(message.getMessageId(), to);
+        addCallback(iar, message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -443,14 +376,8 @@ public class MessagingService
         }
     }
 
-    public static IMessageCallback getRegisteredCallback(String messageId)
-    {
-        return callbacks.get(messageId);
-    }
-    
-    public static IMessageCallback removeRegisteredCallback(String messageId)
+    public static Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String messageId)
     {
-        targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
         return callbacks.remove(messageId);
     }
 
@@ -459,11 +386,6 @@ public class MessagingService
         return callbacks.getAge(messageId);
     }
 
-    public static void responseReceivedFrom(String messageId, InetAddress from)
-    {
-        removeTarget(messageId, from);
-    }
-
     public static void validateMagic(int magic) throws IOException
     {
         if (magic != PROTOCOL_MAGIC)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jan 27 16:53:44 2011
@@ -18,8 +18,12 @@
 
 package org.apache.cassandra.net;
 
+import java.net.InetAddress;
+
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.utils.Pair;
+
 public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
@@ -27,14 +31,13 @@ public class ResponseVerbHandler impleme
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
-        MessagingService.responseReceivedFrom(messageId, message.getFrom());
         double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
-        IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
-        if (cb == null)
+        Pair<InetAddress, IMessageCallback> pair = MessagingService.removeRegisteredCallback(messageId);
+        if (pair == null)
             return;
 
-        // if cb is not null, then age will be valid
-        MessagingService.instance.addLatency(message.getFrom(), age);
+        IMessageCallback cb = pair.right;
+        MessagingService.instance.maybeAddLatency(cb, message.getFrom(), age);
 
         if (cb instanceof IAsyncCallback)
         {
@@ -49,5 +52,4 @@ public class ResponseVerbHandler impleme
             ((IAsyncResult) cb).result(message);
         }
     }
-
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Jan 27 16:53:44 2011
@@ -84,15 +84,16 @@ class ConsistencyChecker implements Runn
         ReadCommand readCommandDigestOnly = constructReadMessage(true);
 		try
 		{
-			Message message = readCommandDigestOnly.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-
-            MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
+            DigestResponseHandler handler = new DigestResponseHandler();
             for (InetAddress endpoint : replicas_)
             {
                 if (!endpoint.equals(dataSource))
-                    MessagingService.instance.sendOneWay(message, endpoint);
+                {
+                    Message message = readCommandDigestOnly.makeReadMessage();
+                    if (logger_.isDebugEnabled())
+                      logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
+                    MessagingService.instance.sendRR(message, endpoint, handler);
+                }
             }
 		}
 		catch (IOException ex)
@@ -128,14 +129,16 @@ class ConsistencyChecker implements Runn
                 if (!Arrays.equals(dataDigest, digest))
                 {
                     ReadCommand readCommand = constructReadMessage(false);
-                    Message message = readCommand.makeReadMessage();
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-                    MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
+                    DataRepairHandler handler = new DataRepairHandler();
                     for (InetAddress endpoint : replicas_)
                     {
                         if (!endpoint.equals(dataSource))
-                            MessagingService.instance.sendOneWay(message, endpoint);
+                        {
+                            Message message = readCommand.makeReadMessage();
+                            if (logger_.isDebugEnabled())
+                                logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
+                            MessagingService.instance.sendRR(message, endpoint, handler);
+                        }
                     }
 
                     repairInvoked = true;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Jan 27 16:53:44 2011
@@ -49,35 +49,25 @@ public class QuorumResponseHandler<T> im
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        boolean success;
         try
         {
-            long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
-            boolean success;
-            try
-            {
-                success = condition.await(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (InterruptedException ex)
-            {
-                throw new AssertionError(ex);
-            }
-
-            if (!success)
-            {
-                StringBuilder sb = new StringBuilder("");
-                for (Message message : responses)
-                {
-                    sb.append(message.getFrom());
-                }
-                throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
-            }
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
         }
-        finally
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+
+        if (!success)
         {
-            for (Message response : responses)
+            StringBuilder sb = new StringBuilder("");
+            for (Message message : responses)
             {
-                MessagingService.removeRegisteredCallback(response.getMessageId());
+                sb.append(message.getFrom());
             }
+            throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
         }
 
         return responseResolver.resolve(responses);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Thu Jan 27 16:53:44 2011
@@ -210,7 +210,6 @@ public class StorageProxy implements Sto
                 // send out the writes, as in mutate() above, but this time with a callback that tracks responses
                 final WriteResponseHandler responseHandler = ss.getWriteResponseHandler(blockFor, consistency_level, table);
                 responseHandlers.add(responseHandler);
-                Message unhintedMessage = null;
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
                 {
                     InetAddress destination = entry.getKey();
@@ -226,14 +225,10 @@ public class StorageProxy implements Sto
                         else
                         {
                             // belongs on a different server.  send it there.
-                            if (unhintedMessage == null)
-                            {
-                                unhintedMessage = rm.makeRowMutationMessage();
-                                MessagingService.instance.addCallback(responseHandler, unhintedMessage.getMessageId());
-                            }
+                            Message unhintedMessage = rm.makeRowMutationMessage();
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + destination);
-                            MessagingService.instance.sendOneWay(unhintedMessage, destination);
+                            MessagingService.instance.sendRR(unhintedMessage, destination, responseHandler);
                         }
                     }
                     else
@@ -251,8 +246,9 @@ public class StorageProxy implements Sto
                         }
                         // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
                         if (writeEndpoints.contains(destination) || consistency_level == ConsistencyLevel.ANY)
-                            MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
-                        MessagingService.instance.sendOneWay(hintedMessage, destination);
+                            MessagingService.instance.sendRR(hintedMessage, destination, responseHandler);
+                        else
+                            MessagingService.instance.sendOneWay(hintedMessage, destination);
                     }
                 }
             }
@@ -441,17 +437,15 @@ public class StorageProxy implements Sto
     private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-        List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
+        List<List<InetAddress>> commandEndPoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
 
         // send out read requests
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
-            ReadCommand readMessageDigestOnly = command.copy();
-            readMessageDigestOnly.setDigestQuery(true);
-            Message message = command.makeReadMessage();
-            Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+            ReadCommand digestCommand = command.copy();
+            digestCommand.setDigestQuery(true);
 
             InetAddress dataPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
             List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
@@ -460,24 +454,19 @@ public class StorageProxy implements Sto
             if (endpointList.size() < responseCount)
                 throw new UnavailableException();
 
-            InetAddress[] endPoints = new InetAddress[endpointList.size()];
-            Message messages[] = new Message[endpointList.size()];
+            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(responseCount, resolver);
             // data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
-            int n = 0;
             for (InetAddress endpoint : endpointList)
             {
-                Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
-                endPoints[n] = endpoint;
-                messages[n++] = m;
+                Message m = endpoint.equals(dataPoint) ? command.makeReadMessage() : digestCommand.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+                    logger.debug("strongread reading " + (endpoint.equals(dataPoint) ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+                MessagingService.instance.sendRR(m, endpoint, quorumResponseHandler);
             }
-            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(responseCount, resolver);
-            MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
-            commandEndPoints.add(endPoints);
+            commandEndPoints.add(endpointList);
         }
 
         // read results and make a second pass for any digest mismatches
@@ -506,8 +495,11 @@ public class StorageProxy implements Sto
                     int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(command.table), consistency_level);
                     ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key, responseCount);
                     QuorumResponseHandler<Row> qrhRepair = new QuorumResponseHandler<Row>(responseCount, resolver);
-                    Message messageRepair = command.makeReadMessage();
-                    MessagingService.instance.sendRR(messageRepair, commandEndPoints.get(i), qrhRepair);
+                    for (InetAddress endPoint : commandEndPoints.get(i))
+                    {
+                        Message messageRepair = command.makeReadMessage();
+                        MessagingService.instance.sendRR(messageRepair, endPoint, qrhRepair);
+                    }
                     if (repairResponseHandlers == null)
                         repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
                     repairResponseHandlers.add(qrhRepair);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java Thu Jan 27 16:53:44 2011
@@ -55,30 +55,20 @@ public class WriteResponseHandler implem
 
     public void get() throws TimeoutException
     {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        boolean success;
         try
         {
-            long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
-            boolean success;
-            try
-            {
-                success = condition.await(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (InterruptedException ex)
-            {
-                throw new AssertionError(ex);
-            }
-
-            if (!success)
-            {
-                throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
-            }
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
         }
-        finally
+
+        if (!success)
         {
-            for (Message response : responses)
-            {
-                MessagingService.removeRegisteredCallback(response.getMessageId());
-            }
+            throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
         }
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1064193&r1=1064192&r2=1064193&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Thu Jan 27 16:53:44 2011
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.utils;
 
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.Callable;
 
 import com.google.common.base.Function;
@@ -33,7 +30,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
-    private final Function<K, ?> postExpireHook;
+    private final Function<Pair<K,V>, ?> postExpireHook;
 
     private static class CacheableObject<T>
     {
@@ -69,18 +66,12 @@ public class ExpiringMap<K, V>
         @Override
         public void run()
         {
-            synchronized (cache)
+            for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
             {
-                Enumeration<K> e = cache.keys();
-                while (e.hasMoreElements())
+                if (entry.getValue().isReadyToDie(expiration))
                 {
-                    K key = e.nextElement();
-                    CacheableObject co = cache.get(key);
-                    if (co != null && co.isReadyToDie(expiration))
-                    {
-                        cache.remove(key);
-                        postExpireHook.apply(key);
-                    }
+                    cache.remove(entry.getKey());
+                    postExpireHook.apply(new Pair(entry.getKey(), entry.getValue().getValue()));
                 }
             }
         }
@@ -99,7 +90,7 @@ public class ExpiringMap<K, V>
      *
      * @param expiration the TTL for objects in the cache in milliseconds
      */
-    public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+    public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
     {
         this.postExpireHook = postExpireHook;
         if (expiration <= 0)