You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/09 20:11:08 UTC
svn commit: r1069035 -
/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Author: gdusbabek
Date: Wed Feb 9 19:11:08 2011
New Revision: 1069035
URL: http://svn.apache.org/viewvc?rev=1069035&view=rev
Log:
cache versioned messages in StorageProxy. patch by gdusbabek, reviewed by jbellis. CASSANDRA-2140
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1069035&r1=1069034&r2=1069035&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb 9 19:11:08 2011
@@ -30,6 +30,8 @@ import javax.management.ObjectName;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.net.CacheingMessageProducer;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -206,6 +208,7 @@ public class StorageProxy implements Sto
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+ MessageProducer prod = new CacheingMessageProducer(rm);
for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
{
@@ -224,8 +227,6 @@ public class StorageProxy implements Sto
else
{
// belongs on a different server
- // TODO re-use Message objects
- Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
@@ -236,7 +237,7 @@ public class StorageProxy implements Sto
dcMessages.put(dc, messages);
}
- messages.put(unhintedMessage, destination);
+ messages.put(prod.getMessage(Gossiper.instance.getVersion(destination)), destination);
}
}
else
@@ -280,7 +281,6 @@ public class StorageProxy implements Sto
Message message = messages.getKey();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
- // TODO this is currently a no-op until re-use Message object TODOs are fixed
message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
@@ -558,6 +558,7 @@ public class StorageProxy implements Sto
// We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.
+ MessageProducer prod = new CacheingMessageProducer(digestCommand);
for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -568,11 +569,9 @@ public class StorageProxy implements Sto
}
else
{
- // TODO re-use Message objects
- Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
if (logger.isDebugEnabled())
logger.debug("reading digest for " + command + " from " + digestPoint);
- MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
+ MessagingService.instance().sendRR(prod, digestPoint, handler);
}
}
@@ -671,9 +670,9 @@ public class StorageProxy implements Sto
{
ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
- // TODO should re-use Message objects
+ MessageProducer prod = new CacheingMessageProducer(command);
for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(command, endpoint, handler);
+ MessagingService.instance().sendRR(prod, endpoint, handler);
return handler;
}
@@ -726,12 +725,11 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
+ MessageProducer prod = new CacheingMessageProducer(c2);
// TODO bail early if live endpoints can't satisfy requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
- // TODO re-use Message objects
- Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
- MessagingService.instance().sendRR(message, endpoint, handler);
+ MessagingService.instance().sendRR(prod, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " + endpoint);
}
@@ -1013,11 +1011,10 @@ public class StorageProxy implements Sto
throw new UnavailableException();
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
+ MessageProducer prod = new CacheingMessageProducer(command);
for (InetAddress endpoint : liveEndpoints)
{
- // TODO re-use Message objects
- Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
- MessagingService.instance().sendRR(message, endpoint, handler);
+ MessagingService.instance().sendRR(prod, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " + endpoint);
}
@@ -1101,12 +1098,11 @@ public class StorageProxy implements Sto
// Send out the truncate calls and track the responses with the callbacks.
logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
- Truncation truncation = new Truncation(keyspace, cfname);
+ final Truncation truncation = new Truncation(keyspace, cfname);
+ MessageProducer prod = new CacheingMessageProducer(truncation);
for (InetAddress endpoint : allEndpoints)
{
- // TODO re-use Message objects
- Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
- MessagingService.instance().sendRR(message, endpoint, responseHandler);
+ MessagingService.instance().sendRR(prod, endpoint, responseHandler);
}
// Wait for all