You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/09 20:11:08 UTC

svn commit: r1069035 - /cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Author: gdusbabek
Date: Wed Feb  9 19:11:08 2011
New Revision: 1069035

URL: http://svn.apache.org/viewvc?rev=1069035&view=rev
Log:
cache versioned messages in StorageProxy. patch by gdusbabek, reviewed by jbellis. CASSANDRA-2140

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1069035&r1=1069034&r2=1069035&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb  9 19:11:08 2011
@@ -30,6 +30,8 @@ import javax.management.ObjectName;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.net.CacheingMessageProducer;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -206,6 +208,7 @@ public class StorageProxy implements Sto
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
         Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+        MessageProducer prod = new CacheingMessageProducer(rm);
 
         for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
         {
@@ -224,8 +227,6 @@ public class StorageProxy implements Sto
                 else
                 {
                     // belongs on a different server
-                    // TODO re-use Message objects
-                    Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
                     if (logger.isDebugEnabled())
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
 
@@ -236,7 +237,7 @@ public class StorageProxy implements Sto
                        dcMessages.put(dc, messages);
                     }
 
-                    messages.put(unhintedMessage, destination);
+                    messages.put(prod.getMessage(Gossiper.instance.getVersion(destination)), destination);
                 }
             }
             else
@@ -280,7 +281,6 @@ public class StorageProxy implements Sto
                 Message message = messages.getKey();
                 // a single message object is used for unhinted writes, so clean out any forwards
                 // from previous loop iterations
-                // TODO this is currently a no-op until re-use Message object TODOs are fixed
                 message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
@@ -558,6 +558,7 @@ public class StorageProxy implements Sto
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
+            MessageProducer prod = new CacheingMessageProducer(digestCommand);
             for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -568,11 +569,9 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
-                    // TODO re-use Message objects
-                    Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest for " + command + " from " + digestPoint);
-                    MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
+                    MessagingService.instance().sendRR(prod, digestPoint, handler);
                 }
             }
 
@@ -671,9 +670,9 @@ public class StorageProxy implements Sto
     {
         ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
-        // TODO should re-use Message objects
+        MessageProducer prod = new CacheingMessageProducer(command);
         for (InetAddress endpoint : endpoints)
-            MessagingService.instance().sendRR(command, endpoint, handler);
+            MessagingService.instance().sendRR(prod, endpoint, handler);
         return handler;
     }
 
@@ -726,12 +725,11 @@ public class StorageProxy implements Sto
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
                     ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
+                    MessageProducer prod = new CacheingMessageProducer(c2);
                     // TODO bail early if live endpoints can't satisfy requested consistency level
                     for (InetAddress endpoint : liveEndpoints)
                     {
-                        // TODO re-use Message objects
-                        Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
-                        MessagingService.instance().sendRR(message, endpoint, handler);
+                        MessagingService.instance().sendRR(prod, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + c2 + " from " + endpoint);
                     }
@@ -1013,11 +1011,10 @@ public class StorageProxy implements Sto
                 throw new UnavailableException();
 
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
+            MessageProducer prod = new CacheingMessageProducer(command);
             for (InetAddress endpoint : liveEndpoints)
             {
-                // TODO re-use Message objects
-                Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
-                MessagingService.instance().sendRR(message, endpoint, handler);
+                MessagingService.instance().sendRR(prod, endpoint, handler);
                 if (logger.isDebugEnabled())
                     logger.debug("reading " + command + " from " + endpoint);
             }
@@ -1101,12 +1098,11 @@ public class StorageProxy implements Sto
 
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
-        Truncation truncation = new Truncation(keyspace, cfname);
+        final Truncation truncation = new Truncation(keyspace, cfname);
+        MessageProducer prod = new CacheingMessageProducer(truncation);
         for (InetAddress endpoint : allEndpoints)
         {
-            // TODO re-use Message objects
-            Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
-            MessagingService.instance().sendRR(message, endpoint, responseHandler);
+            MessagingService.instance().sendRR(prod, endpoint, responseHandler);
         }
 
         // Wait for all