You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/08 18:04:20 UTC
svn commit: r1166774 -
/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Thu Sep 8 16:04:20 2011
New Revision: 1166774
URL: http://svn.apache.org/viewvc?rev=1166774&view=rev
Log:
Improve caching of same-version Messages on digest and repair paths
patch by jbellis; reviewed by slebresne for CASSANDRA-3158
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1166774&r1=1166773&r2=1166774&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Thu Sep 8 16:04:20 2011
@@ -516,50 +516,45 @@ public class StorageProxy implements Sto
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
+ readCallbacks.add(handler);
- // The data-request message is sent to dataPoint, the node that will actually get
- // the data for us. The other replicas are only sent a digest query.
- ReadCommand digestCommand = null;
- if (handler.endpoints.size() > 1)
- {
- digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- }
-
+ // The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getLocalAddress()))
{
- if (logger.isDebugEnabled())
- logger.debug("reading data locally");
+ logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
- if (logger.isDebugEnabled())
- logger.debug("reading data from " + dataPoint);
+ logger.debug("reading data from {}", dataPoint);
MessagingService.instance().sendRR(command, dataPoint, handler);
}
- // We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.
- MessageProducer producer = new CachingMessageProducer(digestCommand);
+ if (handler.endpoints.size() == 1)
+ continue;
+
+ // send the other endpoints a digest request
+ ReadCommand digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ MessageProducer producer = null;
for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
{
- if (logger.isDebugEnabled())
- logger.debug("reading digest locally");
+ logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- if (logger.isDebugEnabled())
- logger.debug("reading digest for from " + digestPoint);
+ logger.debug("reading digest from {}", digestPoint);
+ // (We lazy-construct the digest Message object since it may not be necessary if we
+ // are doing a local digest read, or no digest reads at all.)
+ if (producer == null)
+ producer = new CachingMessageProducer(digestCommand);
MessagingService.instance().sendRR(producer, digestPoint, handler);
}
}
-
- readCallbacks.add(handler);
}
// read results and make a second pass for any digest mismatches
@@ -591,8 +586,9 @@ public class StorageProxy implements Sto
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+ MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
- MessagingService.instance().sendRR(command, endpoint, repairHandler);
+ MessagingService.instance().sendRR(producer, endpoint, repairHandler);
if (repairResponseHandlers == null)
repairResponseHandlers = new ArrayList<RepairCallback<Row>>();